Skip to main content

pjson_rs/parser/
buffer_pool.rs

1//! Buffer pool system for zero-copy parsing with memory management
2//!
3//! This module provides a memory pool system to minimize allocations during
4//! JSON parsing, with support for different buffer sizes and reuse strategies.
5
6use crate::{
7    config::SecurityConfig,
8    domain::{DomainError, DomainResult},
9    parser::aligned_alloc::aligned_allocator,
10    security::SecurityValidator,
11};
12use dashmap::DashMap;
13use std::{
14    alloc::Layout,
15    mem,
16    ptr::{self, NonNull},
17    slice,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22/// Buffer pool that manages reusable byte buffers for parsing
23#[derive(Debug)]
24pub struct BufferPool {
25    pools: Arc<DashMap<BufferSize, BufferBucket>>,
26    config: PoolConfig,
27    stats: Arc<parking_lot::Mutex<PoolStats>>, // Keep stats under mutex as it's written less frequently
28}
29
30/// Configuration for buffer pool behavior
31#[derive(Debug, Clone)]
32pub struct PoolConfig {
33    /// Maximum number of buffers per size bucket
34    pub max_buffers_per_bucket: usize,
35    /// Maximum total memory usage in bytes
36    pub max_total_memory: usize,
37    /// How long to keep unused buffers before cleanup
38    pub buffer_ttl: Duration,
39    /// Enable/disable pool statistics tracking
40    pub track_stats: bool,
41    /// Alignment for SIMD operations (typically 32 or 64 bytes)
42    pub simd_alignment: usize,
43    /// Security validator for buffer validation
44    pub validator: SecurityValidator,
45}
46
47/// Standard buffer sizes for different parsing scenarios
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
49pub enum BufferSize {
50    /// Small buffers for short JSON strings (1KB)
51    Small = 1024,
52    /// Medium buffers for typical API responses (8KB)  
53    Medium = 8192,
54    /// Large buffers for complex documents (64KB)
55    Large = 65536,
56    /// Extra large buffers for bulk data (512KB)
57    XLarge = 524288,
58    /// Huge buffers for massive documents (4MB)
59    Huge = 4194304,
60}
61
62/// A bucket containing buffers of the same size
63#[derive(Debug)]
64struct BufferBucket {
65    buffers: Vec<AlignedBuffer>,
66    last_access: Instant,
67}
68
69/// SIMD-aligned buffer with metadata
70///
71/// This buffer guarantees proper alignment for SIMD operations using direct memory allocation.
72/// It supports SSE (16-byte), AVX2 (32-byte), and AVX-512 (64-byte) alignments.
73pub struct AlignedBuffer {
74    /// Raw pointer to aligned memory
75    ptr: NonNull<u8>,
76    /// Current length of valid data
77    len: usize,
78    /// Total capacity in bytes
79    capacity: usize,
80    /// Memory alignment requirement
81    alignment: usize,
82    /// Layout used for allocation (needed for deallocation)
83    layout: Layout,
84    /// Creation timestamp
85    created_at: Instant,
86    /// Last usage timestamp
87    last_used: Instant,
88}
89
90// Safety: AlignedBuffer can be safely sent between threads
91unsafe impl Send for AlignedBuffer {}
92
93// Safety: AlignedBuffer can be safely shared between threads (no interior mutability)
94unsafe impl Sync for AlignedBuffer {}
95
96impl std::fmt::Debug for AlignedBuffer {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("AlignedBuffer")
99            .field("ptr", &format_args!("0x{:x}", self.ptr.as_ptr() as usize))
100            .field("len", &self.len)
101            .field("capacity", &self.capacity)
102            .field("alignment", &self.alignment)
103            .field("is_aligned", &self.is_aligned())
104            .field("created_at", &self.created_at)
105            .field("last_used", &self.last_used)
106            .finish()
107    }
108}
109
110/// Statistics about buffer pool usage
111#[derive(Debug, Clone)]
112pub struct PoolStats {
113    /// Total allocations requested
114    pub total_allocations: u64,
115    /// Cache hits (buffer reused)
116    pub cache_hits: u64,
117    /// Cache misses (new buffer allocated)
118    pub cache_misses: u64,
119    /// Current memory usage in bytes
120    pub current_memory_usage: usize,
121    /// Peak memory usage in bytes
122    pub peak_memory_usage: usize,
123    /// Number of cleanup operations performed
124    pub cleanup_count: u64,
125}
126
127impl BufferPool {
128    /// Create new buffer pool with default configuration
129    pub fn new() -> Self {
130        Self::with_config(PoolConfig::default())
131    }
132
133    /// Create buffer pool with custom configuration
134    pub fn with_config(config: PoolConfig) -> Self {
135        Self {
136            pools: Arc::new(DashMap::new()),
137            config,
138            stats: Arc::new(parking_lot::Mutex::new(PoolStats::new())),
139        }
140    }
141
142    /// Create buffer pool with security configuration
143    pub fn with_security_config(security_config: SecurityConfig) -> Self {
144        Self::with_config(PoolConfig::from(&security_config))
145    }
146
147    /// Get buffer of specified size, reusing if available
148    pub fn get_buffer(&self, size: BufferSize) -> DomainResult<PooledBuffer> {
149        // Security validation: check buffer size
150        self.config
151            .validator
152            .validate_buffer_size(size as usize)
153            .map_err(|e| DomainError::SecurityViolation(e.to_string()))?;
154
155        // Check if we would exceed total memory limit
156        let current_usage = self.current_memory_usage().unwrap_or(0);
157        if current_usage + (size as usize) > self.config.max_total_memory {
158            return Err(DomainError::ResourceExhausted(format!(
159                "Adding buffer of size {} would exceed memory limit: current={}, limit={}",
160                size as usize, current_usage, self.config.max_total_memory
161            )));
162        }
163
164        if self.config.track_stats {
165            self.increment_allocations();
166        }
167
168        // Try to get a buffer from existing bucket
169        if let Some(mut bucket_ref) = self.pools.get_mut(&size)
170            && let Some(mut buffer) = bucket_ref.buffers.pop()
171        {
172            buffer.last_used = Instant::now();
173            bucket_ref.last_access = Instant::now();
174
175            if self.config.track_stats {
176                self.increment_cache_hits();
177            }
178
179            return Ok(PooledBuffer::new(
180                buffer,
181                Arc::clone(&self.pools),
182                size,
183                self.config.max_buffers_per_bucket,
184            ));
185        }
186
187        // No buffer available, create new one
188        if self.config.track_stats {
189            self.increment_cache_misses();
190        }
191
192        let buffer = AlignedBuffer::new(size as usize, self.config.simd_alignment)?;
193        Ok(PooledBuffer::new(
194            buffer,
195            Arc::clone(&self.pools),
196            size,
197            self.config.max_buffers_per_bucket,
198        ))
199    }
200
201    /// Get buffer with at least the specified capacity
202    pub fn get_buffer_with_capacity(&self, min_capacity: usize) -> DomainResult<PooledBuffer> {
203        let size = BufferSize::for_capacity(min_capacity);
204        self.get_buffer(size)
205    }
206
207    /// Perform cleanup of old unused buffers
208    pub fn cleanup(&self) -> DomainResult<CleanupStats> {
209        let now = Instant::now();
210        let mut freed_buffers = 0;
211        let mut freed_memory = 0;
212
213        // DashMap doesn't have retain, so we collect keys to remove
214        let mut keys_to_remove = Vec::new();
215
216        for mut entry in self.pools.iter_mut() {
217            let bucket = entry.value_mut();
218            let old_count = bucket.buffers.len();
219
220            bucket.buffers.retain(|buffer| {
221                let age = now.duration_since(buffer.last_used);
222                if age > self.config.buffer_ttl {
223                    freed_memory += buffer.capacity;
224                    false
225                } else {
226                    true
227                }
228            });
229
230            freed_buffers += old_count - bucket.buffers.len();
231
232            // Mark bucket for removal if empty and not recently accessed
233            if bucket.buffers.is_empty()
234                && now.duration_since(bucket.last_access) >= self.config.buffer_ttl
235            {
236                keys_to_remove.push(*entry.key());
237            }
238        }
239
240        // Remove empty buckets
241        for key in keys_to_remove {
242            self.pools.remove(&key);
243        }
244
245        if self.config.track_stats {
246            self.increment_cleanup_count();
247            self.update_current_memory_usage(-(freed_memory as i64));
248        }
249
250        Ok(CleanupStats {
251            freed_buffers,
252            freed_memory,
253        })
254    }
255
256    /// Get current pool statistics
257    pub fn stats(&self) -> DomainResult<PoolStats> {
258        let stats = self.stats.lock();
259        Ok(stats.clone())
260    }
261
262    /// Get current memory usage across all pools
263    pub fn current_memory_usage(&self) -> DomainResult<usize> {
264        use rayon::prelude::*;
265
266        let usage = self
267            .pools
268            .iter()
269            .par_bridge()
270            .map(|entry| {
271                entry
272                    .value()
273                    .buffers
274                    .par_iter()
275                    .map(|b| b.capacity)
276                    .sum::<usize>()
277            })
278            .sum();
279
280        Ok(usage)
281    }
282
283    // Private statistics methods
284
285    fn increment_allocations(&self) {
286        let mut stats = self.stats.lock();
287        stats.total_allocations += 1;
288    }
289
290    fn increment_cache_hits(&self) {
291        let mut stats = self.stats.lock();
292        stats.cache_hits += 1;
293    }
294
295    fn increment_cache_misses(&self) {
296        let mut stats = self.stats.lock();
297        stats.cache_misses += 1;
298    }
299
300    fn increment_cleanup_count(&self) {
301        let mut stats = self.stats.lock();
302        stats.cleanup_count += 1;
303    }
304
305    fn update_current_memory_usage(&self, delta: i64) {
306        let mut stats = self.stats.lock();
307        stats.current_memory_usage = (stats.current_memory_usage as i64 + delta).max(0) as usize;
308        stats.peak_memory_usage = stats.peak_memory_usage.max(stats.current_memory_usage);
309    }
310}
311
312impl BufferSize {
313    /// Get appropriate buffer size for given capacity
314    pub fn for_capacity(capacity: usize) -> Self {
315        match capacity {
316            0..=1024 => BufferSize::Small,
317            1025..=8192 => BufferSize::Medium,
318            8193..=65536 => BufferSize::Large,
319            65537..=524288 => BufferSize::XLarge,
320            _ => BufferSize::Huge,
321        }
322    }
323
324    /// Get all available buffer sizes in order
325    pub fn all_sizes() -> &'static [BufferSize] {
326        &[
327            BufferSize::Small,
328            BufferSize::Medium,
329            BufferSize::Large,
330            BufferSize::XLarge,
331            BufferSize::Huge,
332        ]
333    }
334}
335
336impl AlignedBuffer {
337    /// Create new aligned buffer with guaranteed SIMD alignment
338    ///
339    /// # Arguments
340    /// * `capacity` - Minimum capacity in bytes
341    /// * `alignment` - Required alignment (must be power of 2)
342    ///
343    /// # Safety
344    /// This function uses unsafe code to allocate aligned memory.
345    /// The memory is properly tracked and will be deallocated on drop.
346    pub fn new(capacity: usize, alignment: usize) -> DomainResult<Self> {
347        // Validate alignment is power of 2 and reasonable
348        if !alignment.is_power_of_two() {
349            return Err(DomainError::InvalidInput(format!(
350                "Alignment {} is not a power of 2",
351                alignment
352            )));
353        }
354
355        // Validate alignment is not too large (max 4096 bytes for page alignment)
356        if alignment > 4096 {
357            return Err(DomainError::InvalidInput(format!(
358                "Alignment {} exceeds maximum of 4096",
359                alignment
360            )));
361        }
362
363        // Minimum alignment should be at least size of usize for proper alignment
364        let alignment = alignment.max(mem::align_of::<usize>());
365
366        // Align capacity to SIMD boundaries
367        let aligned_capacity = (capacity + alignment - 1) & !(alignment - 1);
368
369        // Ensure minimum capacity for safety
370        let aligned_capacity = aligned_capacity.max(alignment);
371
372        // Create layout for allocation (kept for Drop implementation)
373        let layout = Layout::from_size_align(aligned_capacity, alignment).map_err(|e| {
374            DomainError::InvalidInput(format!(
375                "Invalid layout: capacity={}, alignment={}, error={}",
376                aligned_capacity, alignment, e
377            ))
378        })?;
379
380        // Use global SIMD allocator for better performance
381        let allocator = aligned_allocator();
382
383        // Allocate aligned memory using the appropriate allocator backend
384        // Safety: alignment has been validated above
385        let ptr = unsafe { allocator.alloc_aligned(aligned_capacity, alignment)? };
386
387        let now = Instant::now();
388        Ok(Self {
389            ptr,
390            len: 0,
391            capacity: aligned_capacity,
392            alignment,
393            layout,
394            created_at: now,
395            last_used: now,
396        })
397    }
398
399    /// Create an aligned buffer with specific SIMD level
400    pub fn new_sse(capacity: usize) -> DomainResult<Self> {
401        Self::new(capacity, 16) // SSE requires 16-byte alignment
402    }
403
404    /// Create an aligned buffer for AVX2 operations
405    pub fn new_avx2(capacity: usize) -> DomainResult<Self> {
406        Self::new(capacity, 32) // AVX2 requires 32-byte alignment
407    }
408
409    /// Create an aligned buffer for AVX-512 operations
410    pub fn new_avx512(capacity: usize) -> DomainResult<Self> {
411        Self::new(capacity, 64) // AVX-512 requires 64-byte alignment
412    }
413
414    /// Get mutable slice to buffer data
415    pub fn as_mut_slice(&mut self) -> &mut [u8] {
416        // SAFETY: `self.ptr` was allocated via `AlignedAllocator::alloc_aligned` and remains
417        // valid for at least `self.capacity` bytes. `self.len <= self.capacity` is a class
418        // invariant upheld by every method that modifies `len`. The `&mut self` receiver
419        // ensures exclusive access for the lifetime of the returned slice.
420        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
421    }
422
423    /// Get immutable slice to buffer data
424    pub fn as_slice(&self) -> &[u8] {
425        // SAFETY: `self.ptr` was allocated via `AlignedAllocator::alloc_aligned` and remains
426        // valid for at least `self.capacity` bytes. `self.len <= self.capacity` is a class
427        // invariant upheld by every method that modifies `len`. The `&self` receiver ensures
428        // no mutable aliasing exists for the lifetime of the returned slice.
429        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
430    }
431
432    /// Get a mutable slice with full capacity
433    pub fn as_mut_capacity_slice(&mut self) -> &mut [u8] {
434        // SAFETY: `self.ptr` was allocated via `AlignedAllocator::alloc_aligned` for exactly
435        // `self.capacity` bytes. The `&mut self` receiver ensures exclusive access for the
436        // lifetime of the returned slice. Callers are responsible for initializing bytes
437        // before reading them; `set_len` is `unsafe` and documents that requirement.
438        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.capacity) }
439    }
440
441    /// Set the length of valid data
442    ///
443    /// # Safety
444    /// Caller must ensure that `new_len` bytes are initialized
445    pub unsafe fn set_len(&mut self, new_len: usize) {
446        debug_assert!(
447            new_len <= self.capacity,
448            "new_len {} exceeds capacity {}",
449            new_len,
450            self.capacity
451        );
452        self.len = new_len;
453        self.last_used = Instant::now();
454    }
455
456    /// Reserve additional capacity
457    pub fn reserve(&mut self, additional: usize) -> DomainResult<()> {
458        let new_capacity = self
459            .len
460            .checked_add(additional)
461            .ok_or_else(|| DomainError::InvalidInput("Capacity overflow".to_string()))?;
462
463        if new_capacity <= self.capacity {
464            return Ok(());
465        }
466
467        // Align new capacity
468        let aligned_capacity = (new_capacity + self.alignment - 1) & !(self.alignment - 1);
469
470        // Use global SIMD allocator for reallocation
471        let allocator = aligned_allocator();
472
473        // Reallocate using the allocator (which will handle data copying).
474        // SAFETY: `self.ptr` was allocated (or previously reallocated) via
475        // `AlignedAllocator::alloc_aligned` with `self.layout`. `aligned_capacity` is
476        // positive because it is at least `new_capacity > self.capacity >= self.alignment`.
477        // After this call `self.ptr` must not be used — it is replaced below.
478        let new_ptr =
479            unsafe { allocator.realloc_aligned(self.ptr, self.layout, aligned_capacity)? };
480
481        // Update layout for the new size
482        let new_layout = Layout::from_size_align(aligned_capacity, self.alignment)
483            .map_err(|e| DomainError::InvalidInput(format!("Invalid layout: {}", e)))?;
484
485        self.ptr = new_ptr;
486        self.capacity = aligned_capacity;
487        self.layout = new_layout;
488        self.last_used = Instant::now();
489
490        Ok(())
491    }
492
493    /// Push bytes to the buffer
494    pub fn extend_from_slice(&mut self, data: &[u8]) -> DomainResult<()> {
495        let required_capacity = self
496            .len
497            .checked_add(data.len())
498            .ok_or_else(|| DomainError::InvalidInput("Length overflow".to_string()))?;
499
500        if required_capacity > self.capacity {
501            self.reserve(data.len())?;
502        }
503
504        // SAFETY: `reserve` above ensures `self.capacity >= self.len + data.len()`, so
505        // `self.ptr.as_ptr().add(self.len)` is within the allocation. `data` is a valid
506        // `&[u8]` slice so its pointer is also valid for `data.len()` bytes. The destination
507        // range `[self.len, self.len + data.len())` does not overlap with `data` because
508        // `data` is an external caller-provided slice that cannot alias `self.ptr`.
509        unsafe {
510            ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.as_ptr().add(self.len), data.len());
511            self.len += data.len();
512        }
513
514        self.last_used = Instant::now();
515        Ok(())
516    }
517
518    /// Clear buffer contents but keep allocated memory
519    pub fn clear(&mut self) {
520        self.len = 0;
521        self.last_used = Instant::now();
522    }
523
524    /// Get buffer capacity
525    pub fn capacity(&self) -> usize {
526        self.capacity
527    }
528
529    /// Get current length of valid data
530    pub fn len(&self) -> usize {
531        self.len
532    }
533
534    /// Check if buffer is empty
535    pub fn is_empty(&self) -> bool {
536        self.len == 0
537    }
538
539    /// Get the raw pointer to the buffer
540    pub fn as_ptr(&self) -> *const u8 {
541        self.ptr.as_ptr()
542    }
543
544    /// Get the mutable raw pointer to the buffer  
545    pub fn as_mut_ptr(&mut self) -> *mut u8 {
546        self.ptr.as_ptr()
547    }
548
549    /// Check if buffer is properly aligned
550    ///
551    /// This validates that the buffer pointer has the requested alignment,
552    /// which is critical for SIMD operations.
553    pub fn is_aligned(&self) -> bool {
554        let ptr_addr = self.ptr.as_ptr() as usize;
555        ptr_addr.is_multiple_of(self.alignment)
556    }
557
558    /// Get the actual alignment of the buffer
559    pub fn actual_alignment(&self) -> usize {
560        let ptr_addr = self.ptr.as_ptr() as usize;
561        // Find the highest power of 2 that divides the address
562        if ptr_addr == 0 {
563            return usize::MAX; // null pointer is infinitely aligned
564        }
565
566        // Use trailing zeros to find alignment
567        1 << ptr_addr.trailing_zeros()
568    }
569
570    /// Verify buffer is suitable for specific SIMD instruction set
571    pub fn is_simd_compatible(&self, simd_type: SimdType) -> bool {
572        let required_alignment = match simd_type {
573            SimdType::Sse => 16,
574            SimdType::Avx2 => 32,
575            SimdType::Avx512 => 64,
576            SimdType::Neon => 16,
577        };
578
579        self.actual_alignment() >= required_alignment
580    }
581}
582
583/// SIMD instruction set types
584#[derive(Debug, Clone, Copy, PartialEq, Eq)]
585pub enum SimdType {
586    /// SSE instructions (16-byte alignment)
587    Sse,
588    /// AVX2 instructions (32-byte alignment)  
589    Avx2,
590    /// AVX-512 instructions (64-byte alignment)
591    Avx512,
592    /// ARM NEON instructions (16-byte alignment)
593    Neon,
594}
595
596impl Drop for AlignedBuffer {
597    fn drop(&mut self) {
598        // Use the global SIMD allocator for deallocation
599        let allocator = aligned_allocator();
600
601        // Safety: We allocated this memory with the same layout
602        unsafe {
603            allocator.dealloc_aligned(self.ptr, self.layout);
604        }
605    }
606}
607
608impl Clone for AlignedBuffer {
609    fn clone(&self) -> Self {
610        // Create new buffer with same alignment and capacity
611        let mut new_buffer =
612            Self::new(self.capacity, self.alignment).expect("Failed to clone buffer");
613
614        // Copy data.
615        // SAFETY: `self.ptr` is valid for `self.len` bytes (allocation invariant).
616        // `new_buffer.ptr` is valid for `self.capacity` bytes because `Self::new` was called
617        // with `self.capacity` above. `self.len <= self.capacity` ensures the byte count fits
618        // in both ranges. The two allocations are independent heap regions, so they cannot
619        // overlap.
620        unsafe {
621            ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_buffer.ptr.as_ptr(), self.len);
622            new_buffer.len = self.len;
623        }
624
625        new_buffer
626    }
627}
628
629/// RAII wrapper for pooled buffer that returns buffer to pool on drop
630pub struct PooledBuffer {
631    buffer: Option<AlignedBuffer>,
632    pool: Arc<DashMap<BufferSize, BufferBucket>>,
633    size: BufferSize,
634    max_buffers_per_bucket: usize,
635}
636
637impl PooledBuffer {
638    fn new(
639        buffer: AlignedBuffer,
640        pool: Arc<DashMap<BufferSize, BufferBucket>>,
641        size: BufferSize,
642        max_buffers_per_bucket: usize,
643    ) -> Self {
644        Self {
645            buffer: Some(buffer),
646            pool,
647            size,
648            max_buffers_per_bucket,
649        }
650    }
651
652    /// Get mutable reference to buffer
653    pub fn buffer_mut(&mut self) -> Option<&mut AlignedBuffer> {
654        self.buffer.as_mut()
655    }
656
657    /// Get immutable reference to buffer
658    pub fn buffer(&self) -> Option<&AlignedBuffer> {
659        self.buffer.as_ref()
660    }
661
662    /// Get buffer capacity
663    pub fn capacity(&self) -> usize {
664        self.buffer.as_ref().map(|b| b.capacity()).unwrap_or(0)
665    }
666
667    /// Clear buffer contents
668    pub fn clear(&mut self) {
669        if let Some(buffer) = &mut self.buffer {
670            buffer.clear();
671        }
672    }
673}
674
675impl Drop for PooledBuffer {
676    fn drop(&mut self) {
677        if let Some(mut buffer) = self.buffer.take() {
678            buffer.clear(); // Clear contents before returning to pool
679
680            // Get or create bucket for this buffer size
681            let mut bucket_ref = self.pool.entry(self.size).or_insert_with(|| BufferBucket {
682                buffers: Vec::new(),
683                last_access: Instant::now(),
684            });
685
686            // Only return to pool if we haven't exceeded the per-bucket limit
687            if bucket_ref.buffers.len() < self.max_buffers_per_bucket {
688                bucket_ref.buffers.push(buffer);
689                bucket_ref.last_access = Instant::now();
690            }
691        }
692    }
693}
694
695/// Result of cleanup operation
696#[derive(Debug, Clone)]
697pub struct CleanupStats {
698    pub freed_buffers: usize,
699    pub freed_memory: usize,
700}
701
702impl PoolConfig {
703    /// Create configuration from security config
704    pub fn from_security_config(security_config: &SecurityConfig) -> Self {
705        Self::from(security_config)
706    }
707
708    /// Create configuration optimized for SIMD operations
709    pub fn simd_optimized() -> Self {
710        let mut config = Self::from(&SecurityConfig::high_throughput());
711        config.simd_alignment = 64; // AVX-512 alignment
712        config
713    }
714
715    /// Create configuration for low-memory environments
716    pub fn low_memory() -> Self {
717        let mut config = Self::from(&SecurityConfig::low_memory());
718        config.track_stats = false; // Reduce overhead
719        config
720    }
721
722    /// Create configuration for development/testing
723    pub fn development() -> Self {
724        Self::from(&SecurityConfig::development())
725    }
726}
727
728impl Default for PoolConfig {
729    fn default() -> Self {
730        let security_config = SecurityConfig::default();
731        Self {
732            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
733            max_total_memory: security_config.buffers.max_total_memory,
734            buffer_ttl: security_config.buffer_ttl(),
735            track_stats: true,
736            simd_alignment: 32, // AVX2 alignment
737            validator: SecurityValidator::new(security_config),
738        }
739    }
740}
741
742impl From<&SecurityConfig> for PoolConfig {
743    fn from(security_config: &SecurityConfig) -> Self {
744        Self {
745            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
746            max_total_memory: security_config.buffers.max_total_memory,
747            buffer_ttl: security_config.buffer_ttl(),
748            track_stats: true,
749            simd_alignment: 32, // AVX2 alignment
750            validator: SecurityValidator::new(security_config.clone()),
751        }
752    }
753}
754
755impl PoolStats {
756    fn new() -> Self {
757        Self {
758            total_allocations: 0,
759            cache_hits: 0,
760            cache_misses: 0,
761            current_memory_usage: 0,
762            peak_memory_usage: 0,
763            cleanup_count: 0,
764        }
765    }
766
767    /// Get cache hit ratio
768    pub fn hit_ratio(&self) -> f64 {
769        if self.total_allocations == 0 {
770            0.0
771        } else {
772            self.cache_hits as f64 / self.total_allocations as f64
773        }
774    }
775
776    /// Get memory efficiency (current/peak ratio)
777    pub fn memory_efficiency(&self) -> f64 {
778        if self.peak_memory_usage == 0 {
779            1.0
780        } else {
781            self.current_memory_usage as f64 / self.peak_memory_usage as f64
782        }
783    }
784}
785
786impl Default for BufferPool {
787    fn default() -> Self {
788        Self::new()
789    }
790}
791
792/// Global buffer pool instance for convenient access
793static GLOBAL_BUFFER_POOL: std::sync::OnceLock<BufferPool> = std::sync::OnceLock::new();
794
795/// Get global buffer pool instance
796pub fn global_buffer_pool() -> &'static BufferPool {
797    GLOBAL_BUFFER_POOL.get_or_init(BufferPool::new)
798}
799
800/// Initialize global buffer pool with custom configuration
801pub fn initialize_global_buffer_pool(config: PoolConfig) -> DomainResult<()> {
802    GLOBAL_BUFFER_POOL
803        .set(BufferPool::with_config(config))
804        .map_err(|_| {
805            DomainError::InternalError("Global buffer pool already initialized".to_string())
806        })?;
807    Ok(())
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813
814    #[test]
815    fn test_buffer_pool_creation() {
816        let pool = BufferPool::new();
817        assert!(pool.stats().is_ok());
818    }
819
820    #[test]
821    fn test_buffer_allocation() {
822        let pool = BufferPool::new();
823        let buffer = pool.get_buffer(BufferSize::Medium);
824        assert!(buffer.is_ok());
825
826        let buffer = buffer.unwrap();
827        assert!(buffer.capacity() >= BufferSize::Medium as usize);
828    }
829
830    #[test]
831    fn test_buffer_reuse() {
832        let pool = BufferPool::new();
833
834        // Allocate and drop buffer
835        {
836            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
837        }
838
839        // Allocate another buffer of same size
840        let _buffer2 = pool.get_buffer(BufferSize::Small).unwrap();
841
842        // Should have cache hit
843        let stats = pool.stats().unwrap();
844        assert!(stats.cache_hits > 0);
845    }
846
847    #[test]
848    fn test_buffer_size_selection() {
849        assert_eq!(BufferSize::for_capacity(500), BufferSize::Small);
850        assert_eq!(BufferSize::for_capacity(2000), BufferSize::Medium);
851        assert_eq!(BufferSize::for_capacity(50000), BufferSize::Large);
852        assert_eq!(BufferSize::for_capacity(100000), BufferSize::XLarge);
853    }
854
855    #[test]
856    fn test_aligned_buffer_creation_guaranteed() {
857        // Test all common SIMD alignments
858        let test_cases = vec![
859            (1024, 16, "SSE alignment"),
860            (2048, 32, "AVX2 alignment"),
861            (4096, 64, "AVX-512 alignment"),
862        ];
863
864        for (capacity, alignment, description) in test_cases {
865            let buffer = AlignedBuffer::new(capacity, alignment).unwrap();
866
867            // Verify pointer alignment
868            let ptr_addr = buffer.as_ptr() as usize;
869            assert_eq!(
870                ptr_addr % alignment,
871                0,
872                "{}: pointer 0x{:x} is not {}-byte aligned",
873                description,
874                ptr_addr,
875                alignment
876            );
877
878            // Verify is_aligned method
879            assert!(
880                buffer.is_aligned(),
881                "{}: is_aligned() returned false for properly aligned buffer",
882                description
883            );
884
885            // Verify capacity
886            assert!(
887                buffer.capacity() >= capacity,
888                "{}: capacity {} is less than requested {}",
889                description,
890                buffer.capacity(),
891                capacity
892            );
893
894            // Verify actual alignment
895            assert!(
896                buffer.actual_alignment() >= alignment,
897                "{}: actual alignment {} is less than requested {}",
898                description,
899                buffer.actual_alignment(),
900                alignment
901            );
902        }
903    }
904
905    #[test]
906    fn test_buffer_operations() {
907        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
908
909        // Test initial state
910        assert_eq!(buffer.len(), 0);
911        assert!(buffer.is_empty());
912        assert_eq!(buffer.capacity(), 1024);
913
914        // Test extend_from_slice
915        let data = b"Hello, SIMD World!";
916        buffer.extend_from_slice(data).unwrap();
917        assert_eq!(buffer.len(), data.len());
918        assert_eq!(buffer.as_slice(), data);
919
920        // Test clear
921        buffer.clear();
922        assert_eq!(buffer.len(), 0);
923        assert!(buffer.is_empty());
924        assert_eq!(buffer.capacity(), 1024); // Capacity should remain
925
926        // Test unsafe set_len
927        unsafe {
928            // Write some data directly
929            let slice = buffer.as_mut_capacity_slice();
930            slice[0..5].copy_from_slice(b"SIMD!");
931            buffer.set_len(5);
932        }
933        assert_eq!(buffer.len(), 5);
934        assert_eq!(&buffer.as_slice()[0..5], b"SIMD!");
935    }
936
937    #[test]
938    fn test_buffer_reserve() {
939        let mut buffer = AlignedBuffer::new(64, 32).unwrap();
940        let _initial_alignment = buffer.actual_alignment();
941
942        // Set some length first
943        unsafe {
944            buffer.set_len(32);
945        }
946
947        // Reserve additional space - should need capacity for len + additional
948        buffer.reserve(256).unwrap();
949        assert!(
950            buffer.capacity() >= 32 + 256,
951            "Expected capacity >= {}, got {}",
952            32 + 256,
953            buffer.capacity()
954        );
955
956        // Alignment should be preserved after reallocation
957        assert!(
958            buffer.actual_alignment() >= 32,
959            "Alignment not preserved after reserve"
960        );
961        assert!(buffer.is_aligned());
962
963        // Test that data is preserved during reallocation
964        buffer.extend_from_slice(b"test data").unwrap();
965        let old_data = buffer.as_slice().to_vec();
966
967        buffer.reserve(1024).unwrap();
968        assert_eq!(buffer.as_slice(), &old_data[..]);
969    }
970
971    #[test]
972    fn test_buffer_clone() {
973        let mut original = AlignedBuffer::new(512, 64).unwrap();
974        original.extend_from_slice(b"Original data").unwrap();
975
976        let cloned = original.clone();
977
978        // Verify clone has same properties
979        assert_eq!(cloned.len(), original.len());
980        assert_eq!(cloned.capacity(), original.capacity());
981        assert_eq!(cloned.alignment, original.alignment);
982        assert_eq!(cloned.as_slice(), original.as_slice());
983
984        // Verify clone has different memory location
985        assert_ne!(cloned.as_ptr(), original.as_ptr());
986
987        // Verify clone is also properly aligned
988        assert!(cloned.is_aligned());
989        assert!(cloned.actual_alignment() >= 64);
990    }
991
992    #[test]
993    fn test_alignment_validation() {
994        // Test valid power-of-2 alignments
995        let valid_alignments = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096];
996
997        for &alignment in &valid_alignments {
998            let result = AlignedBuffer::new(1024, alignment);
999            assert!(result.is_ok(), "Alignment {} should be valid", alignment);
1000
1001            let buffer = result.unwrap();
1002            assert!(
1003                buffer.is_aligned(),
1004                "Buffer with alignment {} should be aligned",
1005                alignment
1006            );
1007        }
1008
1009        // Test invalid non-power-of-2 alignments
1010        let invalid_alignments = [3, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 17, 31, 33, 63, 65];
1011
1012        for &alignment in &invalid_alignments {
1013            let result = AlignedBuffer::new(1024, alignment);
1014            assert!(result.is_err(), "Alignment {} should be invalid", alignment);
1015        }
1016
1017        // Test too large alignment
1018        assert!(AlignedBuffer::new(1024, 8192).is_err());
1019    }
1020
1021    #[test]
1022    fn test_actual_alignment_calculation() {
1023        // Create buffers with different alignments and verify actual_alignment()
1024        for &requested_align in &[16, 32, 64] {
1025            let buffer = AlignedBuffer::new(1024, requested_align).unwrap();
1026            let actual = buffer.actual_alignment();
1027
1028            assert!(
1029                actual >= requested_align,
1030                "Actual alignment {} is less than requested {}",
1031                actual,
1032                requested_align
1033            );
1034
1035            // actual_alignment should be a power of 2
1036            assert!(
1037                actual.is_power_of_two(),
1038                "Actual alignment {} is not a power of 2",
1039                actual
1040            );
1041        }
1042    }
1043
1044    #[test]
1045    fn test_simd_compatibility_check() {
1046        // SSE buffer should be compatible with SSE but might not be with AVX-512
1047        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1048        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1049        assert!(sse_buffer.is_simd_compatible(SimdType::Neon)); // Same alignment as SSE
1050
1051        // AVX-512 buffer should be compatible with all instruction sets
1052        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1053        assert!(avx512_buffer.is_simd_compatible(SimdType::Sse));
1054        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx2));
1055        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1056        assert!(avx512_buffer.is_simd_compatible(SimdType::Neon));
1057    }
1058
1059    #[test]
1060    fn test_zero_copy_verification() {
1061        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
1062
1063        // Get raw pointer before modification
1064        let ptr_before = buffer.as_ptr();
1065
1066        // Perform various operations that should NOT move the buffer
1067        buffer.clear();
1068        buffer.extend_from_slice(b"test").unwrap();
1069        unsafe {
1070            buffer.set_len(2);
1071        }
1072
1073        // Pointer should remain the same (zero-copy)
1074        assert_eq!(
1075            ptr_before,
1076            buffer.as_ptr(),
1077            "Buffer was moved during operations (not zero-copy)"
1078        );
1079
1080        // Only reserve should potentially change the pointer
1081        buffer.reserve(2048).unwrap();
1082        // After reserve, pointer might change but should still be aligned
1083        assert!(buffer.is_aligned());
1084    }
1085
1086    #[test]
1087    fn test_pool_cleanup() {
1088        let config = PoolConfig {
1089            buffer_ttl: Duration::from_millis(1),
1090            ..Default::default()
1091        };
1092        let pool = BufferPool::with_config(config);
1093
1094        // Allocate and drop buffer
1095        {
1096            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1097        }
1098
1099        // Wait for TTL
1100        std::thread::sleep(Duration::from_millis(10));
1101
1102        // Cleanup should free the buffer
1103        let cleanup_stats = pool.cleanup().unwrap();
1104        assert!(cleanup_stats.freed_buffers > 0);
1105    }
1106
1107    #[test]
1108    fn test_global_buffer_pool() {
1109        let pool = global_buffer_pool();
1110        let buffer = pool.get_buffer(BufferSize::Medium);
1111        assert!(buffer.is_ok());
1112    }
1113
1114    #[test]
1115    fn test_memory_limit_enforcement() {
1116        let config = PoolConfig {
1117            max_total_memory: 1024, // Very small limit
1118            max_buffers_per_bucket: 10,
1119            ..Default::default()
1120        };
1121        let pool = BufferPool::with_config(config);
1122
1123        // Create a buffer that exceeds the memory limit
1124        let result = pool.get_buffer(BufferSize::Medium); // 8KB > 1KB limit
1125
1126        assert!(result.is_err());
1127
1128        if let Err(e) = result {
1129            assert!(e.to_string().contains("memory limit"));
1130        }
1131    }
1132
1133    #[test]
1134    fn test_per_bucket_limit_enforcement() {
1135        let config = PoolConfig {
1136            max_buffers_per_bucket: 2,          // Very small limit
1137            max_total_memory: 10 * 1024 * 1024, // Generous memory limit
1138            ..Default::default()
1139        };
1140        let pool = BufferPool::with_config(config);
1141
1142        // Allocate and drop buffers to fill the bucket
1143        for _ in 0..3 {
1144            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1145            // Buffer goes back to pool on drop
1146        }
1147
1148        // Only 2 buffers should be retained in the pool
1149        let stats = pool.stats().unwrap();
1150        assert!(stats.cache_hits <= 2, "Too many buffers retained in bucket");
1151    }
1152
1153    #[test]
1154    fn test_buffer_size_validation() {
1155        let pool = BufferPool::new();
1156
1157        // All standard buffer sizes should be valid
1158        for size in BufferSize::all_sizes() {
1159            let result = pool.get_buffer(*size);
1160            assert!(result.is_ok(), "Buffer size {:?} should be valid", size);
1161        }
1162    }
1163
1164    #[test]
1165    fn test_memory_safety() {
1166        // Test that dropping a buffer properly deallocates memory
1167        // This test would fail under valgrind/ASAN if there's a memory leak
1168        for _ in 0..100 {
1169            let buffer = AlignedBuffer::new(1024, 64).unwrap();
1170            drop(buffer);
1171        }
1172
1173        // Test clone and drop
1174        for _ in 0..100 {
1175            let buffer = AlignedBuffer::new(512, 32).unwrap();
1176            let cloned = buffer.clone();
1177            drop(buffer);
1178            drop(cloned);
1179        }
1180    }
1181
1182    #[test]
1183    fn test_simd_specific_constructors() {
1184        // Test SSE alignment (16 bytes)
1185        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1186        assert!(sse_buffer.is_aligned());
1187        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1188        assert_eq!(sse_buffer.alignment, 16);
1189
1190        // Test AVX2 alignment (32 bytes)
1191        let avx2_buffer = AlignedBuffer::new_avx2(1024).unwrap();
1192        assert!(avx2_buffer.is_aligned());
1193        assert!(avx2_buffer.is_simd_compatible(SimdType::Avx2));
1194        assert_eq!(avx2_buffer.alignment, 32);
1195
1196        // Test AVX-512 alignment (64 bytes)
1197        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1198        assert!(avx512_buffer.is_aligned());
1199        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1200        assert_eq!(avx512_buffer.alignment, 64);
1201    }
1202
1203    #[test]
1204    fn test_simd_alignment_compatibility() {
1205        let buffer_64 = AlignedBuffer::new(1024, 64).unwrap();
1206
1207        // 64-byte aligned buffer should be compatible with all SIMD types
1208        assert!(buffer_64.is_simd_compatible(SimdType::Sse)); // 16-byte requirement
1209        assert!(buffer_64.is_simd_compatible(SimdType::Avx2)); // 32-byte requirement
1210        assert!(buffer_64.is_simd_compatible(SimdType::Avx512)); // 64-byte requirement
1211        assert!(buffer_64.is_simd_compatible(SimdType::Neon)); // 16-byte requirement
1212
1213        // Note: We can't easily test incompatible alignments since the allocator
1214        // might provide better alignment than requested for performance reasons.
1215        // Instead, test the requested alignment vs required alignment directly.
1216        #[allow(clippy::assertions_on_constants)]
1217        {
1218            assert!(64 >= 16); // SSE compatible
1219            assert!(64 >= 32); // AVX2 compatible
1220            assert!(64 >= 64); // AVX512 compatible
1221            assert!(64 >= 16); // NEON compatible
1222        }
1223
1224        let buffer_16 = AlignedBuffer::new(1024, 16).unwrap();
1225
1226        // Test that buffer reports correct requested alignment
1227        assert_eq!(buffer_16.alignment, 16);
1228
1229        // 16-byte aligned buffer should be compatible with SSE and NEON
1230        assert!(buffer_16.is_simd_compatible(SimdType::Sse));
1231        assert!(buffer_16.is_simd_compatible(SimdType::Neon));
1232
1233        // Note: actual_alignment() might be higher than 16 due to allocator behavior
1234        // so we can't reliably test incompatibility. Instead verify logic:
1235        #[allow(clippy::assertions_on_constants)]
1236        {
1237            assert!(16 >= 16); // SSE requirement met
1238            assert!(16 < 32); // AVX2 requirement NOT met by requested alignment
1239            assert!(16 < 64); // AVX512 requirement NOT met by requested alignment
1240        }
1241    }
1242
1243    #[test]
1244    fn test_actual_alignment_detection() {
1245        let buffer = AlignedBuffer::new(1024, 64).unwrap();
1246
1247        let actual_alignment = buffer.actual_alignment();
1248        assert!(
1249            actual_alignment >= 64,
1250            "Buffer has actual alignment of {}, expected at least 64",
1251            actual_alignment
1252        );
1253
1254        // The actual alignment should be a power of 2 and >= requested alignment
1255        assert!(actual_alignment.is_power_of_two());
1256        assert!(actual_alignment >= buffer.alignment);
1257    }
1258
1259    #[test]
1260    fn test_simd_pool_configuration() {
1261        // Test pool with high SIMD alignment requirement
1262        let config = PoolConfig {
1263            simd_alignment: 64, // AVX-512 alignment
1264            ..Default::default()
1265        };
1266        let pool = BufferPool::with_config(config);
1267
1268        let buffer = pool.get_buffer(BufferSize::Medium).unwrap();
1269        assert!(buffer.buffer().unwrap().is_aligned());
1270        assert!(
1271            buffer
1272                .buffer()
1273                .unwrap()
1274                .is_simd_compatible(SimdType::Avx512)
1275        );
1276    }
1277
1278    #[test]
1279    fn test_alignment_edge_cases() {
1280        // Test minimum alignment
1281        let buffer_min = AlignedBuffer::new(64, 1).unwrap();
1282        assert!(buffer_min.is_aligned());
1283        assert!(buffer_min.alignment >= mem::align_of::<usize>());
1284
1285        // Test power-of-2 validation
1286        assert!(AlignedBuffer::new(1024, 3).is_err());
1287        assert!(AlignedBuffer::new(1024, 17).is_err());
1288        assert!(AlignedBuffer::new(1024, 33).is_err());
1289
1290        // Test maximum alignment limit
1291        assert!(AlignedBuffer::new(1024, 8192).is_err());
1292    }
1293
1294    #[test]
1295    fn test_simd_performance_oriented_allocation() {
1296        // Test that allocation pattern is suitable for high-performance SIMD
1297        let buffer = AlignedBuffer::new_avx512(4096).unwrap();
1298
1299        // Verify the buffer can be used for actual SIMD-like operations
1300        let slice = unsafe { std::slice::from_raw_parts_mut(buffer.ptr.as_ptr(), buffer.capacity) };
1301
1302        // Fill with test pattern
1303        for (i, byte) in slice.iter_mut().enumerate() {
1304            *byte = (i % 256) as u8;
1305        }
1306
1307        // Verify alignment is maintained through operations
1308        assert!(buffer.is_aligned());
1309        assert_eq!(slice[0], 0);
1310        assert_eq!(slice[255], 255);
1311        assert_eq!(slice[256], 0);
1312    }
1313}