Skip to main content

pjson_rs/parser/
buffer_pool.rs

1//! Buffer pool system for zero-copy parsing with memory management
2//!
3//! This module provides a memory pool system to minimize allocations during
4//! JSON parsing, with support for different buffer sizes and reuse strategies.
5
6use crate::{
7    config::SecurityConfig,
8    domain::{DomainError, DomainResult},
9    parser::aligned_alloc::aligned_allocator,
10    security::SecurityValidator,
11};
12use dashmap::DashMap;
13use std::{
14    alloc::Layout,
15    mem,
16    ptr::{self, NonNull},
17    slice,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22/// Buffer pool that manages reusable byte buffers for parsing
23#[derive(Debug)]
24pub struct BufferPool {
25    pools: Arc<DashMap<BufferSize, BufferBucket>>,
26    config: PoolConfig,
27    stats: Arc<parking_lot::Mutex<PoolStats>>, // Keep stats under mutex as it's written less frequently
28}
29
30/// Configuration for buffer pool behavior
31#[derive(Debug, Clone)]
32pub struct PoolConfig {
33    /// Maximum number of buffers per size bucket
34    pub max_buffers_per_bucket: usize,
35    /// Maximum total memory usage in bytes
36    pub max_total_memory: usize,
37    /// How long to keep unused buffers before cleanup
38    pub buffer_ttl: Duration,
39    /// Enable/disable pool statistics tracking
40    pub track_stats: bool,
41    /// Alignment for SIMD operations (typically 32 or 64 bytes)
42    pub simd_alignment: usize,
43    /// Security validator for buffer validation
44    pub validator: SecurityValidator,
45}
46
47/// Standard buffer sizes for different parsing scenarios
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
49pub enum BufferSize {
50    /// Small buffers for short JSON strings (1KB)
51    Small = 1024,
52    /// Medium buffers for typical API responses (8KB)  
53    Medium = 8192,
54    /// Large buffers for complex documents (64KB)
55    Large = 65536,
56    /// Extra large buffers for bulk data (512KB)
57    XLarge = 524288,
58    /// Huge buffers for massive documents (4MB)
59    Huge = 4194304,
60}
61
62/// A bucket containing buffers of the same size
63#[derive(Debug)]
64struct BufferBucket {
65    buffers: Vec<AlignedBuffer>,
66    last_access: Instant,
67}
68
69/// SIMD-aligned buffer with metadata
70///
71/// This buffer guarantees proper alignment for SIMD operations using direct memory allocation.
72/// It supports SSE (16-byte), AVX2 (32-byte), and AVX-512 (64-byte) alignments.
73pub struct AlignedBuffer {
74    /// Raw pointer to aligned memory
75    ptr: NonNull<u8>,
76    /// Current length of valid data
77    len: usize,
78    /// Total capacity in bytes
79    capacity: usize,
80    /// Memory alignment requirement
81    alignment: usize,
82    /// Layout used for allocation (needed for deallocation)
83    layout: Layout,
84    /// Creation timestamp
85    created_at: Instant,
86    /// Last usage timestamp
87    last_used: Instant,
88}
89
90// Safety: AlignedBuffer can be safely sent between threads
91unsafe impl Send for AlignedBuffer {}
92
93// Safety: AlignedBuffer can be safely shared between threads (no interior mutability)
94unsafe impl Sync for AlignedBuffer {}
95
96impl std::fmt::Debug for AlignedBuffer {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("AlignedBuffer")
99            .field("ptr", &format_args!("0x{:x}", self.ptr.as_ptr() as usize))
100            .field("len", &self.len)
101            .field("capacity", &self.capacity)
102            .field("alignment", &self.alignment)
103            .field("is_aligned", &self.is_aligned())
104            .field("created_at", &self.created_at)
105            .field("last_used", &self.last_used)
106            .finish()
107    }
108}
109
110/// Statistics about buffer pool usage
111#[derive(Debug, Clone)]
112pub struct PoolStats {
113    /// Total allocations requested
114    pub total_allocations: u64,
115    /// Cache hits (buffer reused)
116    pub cache_hits: u64,
117    /// Cache misses (new buffer allocated)
118    pub cache_misses: u64,
119    /// Current memory usage in bytes
120    pub current_memory_usage: usize,
121    /// Peak memory usage in bytes
122    pub peak_memory_usage: usize,
123    /// Number of cleanup operations performed
124    pub cleanup_count: u64,
125}
126
127impl BufferPool {
128    /// Create new buffer pool with default configuration
129    pub fn new() -> Self {
130        Self::with_config(PoolConfig::default())
131    }
132
133    /// Create buffer pool with custom configuration
134    pub fn with_config(config: PoolConfig) -> Self {
135        Self {
136            pools: Arc::new(DashMap::new()),
137            config,
138            stats: Arc::new(parking_lot::Mutex::new(PoolStats::new())),
139        }
140    }
141
142    /// Create buffer pool with security configuration
143    pub fn with_security_config(security_config: SecurityConfig) -> Self {
144        Self::with_config(PoolConfig::from(&security_config))
145    }
146
147    /// Get buffer of specified size, reusing if available
148    pub fn get_buffer(&self, size: BufferSize) -> DomainResult<PooledBuffer> {
149        // Security validation: check buffer size
150        self.config
151            .validator
152            .validate_buffer_size(size as usize)
153            .map_err(|e| DomainError::SecurityViolation(e.to_string()))?;
154
155        // Check if we would exceed total memory limit
156        let current_usage = self.current_memory_usage().unwrap_or(0);
157        if current_usage + (size as usize) > self.config.max_total_memory {
158            return Err(DomainError::ResourceExhausted(format!(
159                "Adding buffer of size {} would exceed memory limit: current={}, limit={}",
160                size as usize, current_usage, self.config.max_total_memory
161            )));
162        }
163
164        if self.config.track_stats {
165            self.increment_allocations();
166        }
167
168        // Try to get a buffer from existing bucket
169        if let Some(mut bucket_ref) = self.pools.get_mut(&size)
170            && let Some(mut buffer) = bucket_ref.buffers.pop()
171        {
172            buffer.last_used = Instant::now();
173            bucket_ref.last_access = Instant::now();
174
175            if self.config.track_stats {
176                self.increment_cache_hits();
177            }
178
179            return Ok(PooledBuffer::new(
180                buffer,
181                Arc::clone(&self.pools),
182                size,
183                self.config.max_buffers_per_bucket,
184            ));
185        }
186
187        // No buffer available, create new one
188        if self.config.track_stats {
189            self.increment_cache_misses();
190        }
191
192        let buffer = AlignedBuffer::new(size as usize, self.config.simd_alignment)?;
193        Ok(PooledBuffer::new(
194            buffer,
195            Arc::clone(&self.pools),
196            size,
197            self.config.max_buffers_per_bucket,
198        ))
199    }
200
201    /// Get buffer with at least the specified capacity
202    pub fn get_buffer_with_capacity(&self, min_capacity: usize) -> DomainResult<PooledBuffer> {
203        let size = BufferSize::for_capacity(min_capacity);
204        self.get_buffer(size)
205    }
206
207    /// Perform cleanup of old unused buffers
208    pub fn cleanup(&self) -> DomainResult<CleanupStats> {
209        let now = Instant::now();
210        let mut freed_buffers = 0;
211        let mut freed_memory = 0;
212
213        // DashMap doesn't have retain, so we collect keys to remove
214        let mut keys_to_remove = Vec::new();
215
216        for mut entry in self.pools.iter_mut() {
217            let bucket = entry.value_mut();
218            let old_count = bucket.buffers.len();
219
220            bucket.buffers.retain(|buffer| {
221                let age = now.duration_since(buffer.last_used);
222                if age > self.config.buffer_ttl {
223                    freed_memory += buffer.capacity;
224                    false
225                } else {
226                    true
227                }
228            });
229
230            freed_buffers += old_count - bucket.buffers.len();
231
232            // Mark bucket for removal if empty and not recently accessed
233            if bucket.buffers.is_empty()
234                && now.duration_since(bucket.last_access) >= self.config.buffer_ttl
235            {
236                keys_to_remove.push(*entry.key());
237            }
238        }
239
240        // Remove empty buckets
241        for key in keys_to_remove {
242            self.pools.remove(&key);
243        }
244
245        if self.config.track_stats {
246            self.increment_cleanup_count();
247            self.update_current_memory_usage(-(freed_memory as i64));
248        }
249
250        Ok(CleanupStats {
251            freed_buffers,
252            freed_memory,
253        })
254    }
255
256    /// Get current pool statistics
257    pub fn stats(&self) -> DomainResult<PoolStats> {
258        let stats = self.stats.lock();
259        Ok(stats.clone())
260    }
261
262    /// Get current memory usage across all pools
263    pub fn current_memory_usage(&self) -> DomainResult<usize> {
264        use rayon::prelude::*;
265
266        let usage = self
267            .pools
268            .iter()
269            .par_bridge()
270            .map(|entry| {
271                entry
272                    .value()
273                    .buffers
274                    .par_iter()
275                    .map(|b| b.capacity)
276                    .sum::<usize>()
277            })
278            .sum();
279
280        Ok(usage)
281    }
282
283    // Private statistics methods
284
285    fn increment_allocations(&self) {
286        let mut stats = self.stats.lock();
287        stats.total_allocations += 1;
288    }
289
290    fn increment_cache_hits(&self) {
291        let mut stats = self.stats.lock();
292        stats.cache_hits += 1;
293    }
294
295    fn increment_cache_misses(&self) {
296        let mut stats = self.stats.lock();
297        stats.cache_misses += 1;
298    }
299
300    fn increment_cleanup_count(&self) {
301        let mut stats = self.stats.lock();
302        stats.cleanup_count += 1;
303    }
304
305    fn update_current_memory_usage(&self, delta: i64) {
306        let mut stats = self.stats.lock();
307        stats.current_memory_usage = (stats.current_memory_usage as i64 + delta).max(0) as usize;
308        stats.peak_memory_usage = stats.peak_memory_usage.max(stats.current_memory_usage);
309    }
310}
311
312impl BufferSize {
313    /// Get appropriate buffer size for given capacity
314    pub fn for_capacity(capacity: usize) -> Self {
315        match capacity {
316            0..=1024 => BufferSize::Small,
317            1025..=8192 => BufferSize::Medium,
318            8193..=65536 => BufferSize::Large,
319            65537..=524288 => BufferSize::XLarge,
320            _ => BufferSize::Huge,
321        }
322    }
323
324    /// Get all available buffer sizes in order
325    pub fn all_sizes() -> &'static [BufferSize] {
326        &[
327            BufferSize::Small,
328            BufferSize::Medium,
329            BufferSize::Large,
330            BufferSize::XLarge,
331            BufferSize::Huge,
332        ]
333    }
334}
335
336impl AlignedBuffer {
337    /// Create new aligned buffer with guaranteed SIMD alignment
338    ///
339    /// # Arguments
340    /// * `capacity` - Minimum capacity in bytes
341    /// * `alignment` - Required alignment (must be power of 2)
342    ///
343    /// # Safety
344    /// This function uses unsafe code to allocate aligned memory.
345    /// The memory is properly tracked and will be deallocated on drop.
346    pub fn new(capacity: usize, alignment: usize) -> DomainResult<Self> {
347        // Validate alignment is power of 2 and reasonable
348        if !alignment.is_power_of_two() {
349            return Err(DomainError::InvalidInput(format!(
350                "Alignment {} is not a power of 2",
351                alignment
352            )));
353        }
354
355        // Validate alignment is not too large (max 4096 bytes for page alignment)
356        if alignment > 4096 {
357            return Err(DomainError::InvalidInput(format!(
358                "Alignment {} exceeds maximum of 4096",
359                alignment
360            )));
361        }
362
363        // Minimum alignment should be at least size of usize for proper alignment
364        let alignment = alignment.max(mem::align_of::<usize>());
365
366        // Align capacity to SIMD boundaries
367        let aligned_capacity = (capacity + alignment - 1) & !(alignment - 1);
368
369        // Ensure minimum capacity for safety
370        let aligned_capacity = aligned_capacity.max(alignment);
371
372        // Create layout for allocation (kept for Drop implementation)
373        let layout = Layout::from_size_align(aligned_capacity, alignment).map_err(|e| {
374            DomainError::InvalidInput(format!(
375                "Invalid layout: capacity={}, alignment={}, error={}",
376                aligned_capacity, alignment, e
377            ))
378        })?;
379
380        // Use global SIMD allocator for better performance
381        let allocator = aligned_allocator();
382
383        // Allocate aligned memory using the appropriate allocator backend
384        // Safety: alignment has been validated above
385        let ptr = unsafe { allocator.alloc_aligned(aligned_capacity, alignment)? };
386
387        let now = Instant::now();
388        Ok(Self {
389            ptr,
390            len: 0,
391            capacity: aligned_capacity,
392            alignment,
393            layout,
394            created_at: now,
395            last_used: now,
396        })
397    }
398
399    /// Create an aligned buffer with specific SIMD level
400    pub fn new_sse(capacity: usize) -> DomainResult<Self> {
401        Self::new(capacity, 16) // SSE requires 16-byte alignment
402    }
403
404    /// Create an aligned buffer for AVX2 operations
405    pub fn new_avx2(capacity: usize) -> DomainResult<Self> {
406        Self::new(capacity, 32) // AVX2 requires 32-byte alignment
407    }
408
409    /// Create an aligned buffer for AVX-512 operations
410    pub fn new_avx512(capacity: usize) -> DomainResult<Self> {
411        Self::new(capacity, 64) // AVX-512 requires 64-byte alignment
412    }
413
414    /// Get mutable slice to buffer data
415    pub fn as_mut_slice(&mut self) -> &mut [u8] {
416        // SAFETY: `self.ptr` was allocated via `AlignedAllocator::alloc_aligned` and remains
417        // valid for at least `self.capacity` bytes. `self.len <= self.capacity` is a class
418        // invariant upheld by every method that modifies `len`. The `&mut self` receiver
419        // ensures exclusive access for the lifetime of the returned slice.
420        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
421    }
422
423    /// Get immutable slice to buffer data
424    pub fn as_slice(&self) -> &[u8] {
425        // SAFETY: `self.ptr` was allocated via `AlignedAllocator::alloc_aligned` and remains
426        // valid for at least `self.capacity` bytes. `self.len <= self.capacity` is a class
427        // invariant upheld by every method that modifies `len`. The `&self` receiver ensures
428        // no mutable aliasing exists for the lifetime of the returned slice.
429        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
430    }
431
432    /// Get a mutable slice with full capacity
433    pub fn as_mut_capacity_slice(&mut self) -> &mut [u8] {
434        // SAFETY: `self.ptr` was allocated via `AlignedAllocator::alloc_aligned` for exactly
435        // `self.capacity` bytes. The `&mut self` receiver ensures exclusive access for the
436        // lifetime of the returned slice. Callers are responsible for initializing bytes
437        // before reading them; `set_len` is `unsafe` and documents that requirement.
438        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.capacity) }
439    }
440
441    /// Set the length of valid data
442    ///
443    /// # Safety
444    /// Caller must ensure that `new_len` bytes are initialized
445    pub unsafe fn set_len(&mut self, new_len: usize) {
446        debug_assert!(
447            new_len <= self.capacity,
448            "new_len {} exceeds capacity {}",
449            new_len,
450            self.capacity
451        );
452        self.len = new_len;
453        self.last_used = Instant::now();
454    }
455
456    /// Reserve additional capacity
457    pub fn reserve(&mut self, additional: usize) -> DomainResult<()> {
458        let new_capacity = self
459            .len
460            .checked_add(additional)
461            .ok_or_else(|| DomainError::InvalidInput("Capacity overflow".to_string()))?;
462
463        if new_capacity <= self.capacity {
464            return Ok(());
465        }
466
467        // Align new capacity
468        let aligned_capacity = (new_capacity + self.alignment - 1) & !(self.alignment - 1);
469
470        // Use global SIMD allocator for reallocation
471        let allocator = aligned_allocator();
472
473        // Reallocate using the allocator (which will handle data copying).
474        // SAFETY: `self.ptr` was allocated (or previously reallocated) via
475        // `AlignedAllocator::alloc_aligned` with `self.layout`. `aligned_capacity` is
476        // positive because it is at least `new_capacity > self.capacity >= self.alignment`.
477        // After this call `self.ptr` must not be used — it is replaced below.
478        let new_ptr =
479            unsafe { allocator.realloc_aligned(self.ptr, self.layout, aligned_capacity)? };
480
481        // Update layout for the new size
482        let new_layout = Layout::from_size_align(aligned_capacity, self.alignment)
483            .map_err(|e| DomainError::InvalidInput(format!("Invalid layout: {}", e)))?;
484
485        self.ptr = new_ptr;
486        self.capacity = aligned_capacity;
487        self.layout = new_layout;
488        self.last_used = Instant::now();
489
490        Ok(())
491    }
492
493    /// Push bytes to the buffer
494    pub fn extend_from_slice(&mut self, data: &[u8]) -> DomainResult<()> {
495        let required_capacity = self
496            .len
497            .checked_add(data.len())
498            .ok_or_else(|| DomainError::InvalidInput("Length overflow".to_string()))?;
499
500        if required_capacity > self.capacity {
501            self.reserve(data.len())?;
502        }
503
504        // SAFETY: `reserve` above ensures `self.capacity >= self.len + data.len()`, so
505        // `self.ptr.as_ptr().add(self.len)` is within the allocation. `data` is a valid
506        // `&[u8]` slice so its pointer is also valid for `data.len()` bytes. The destination
507        // range `[self.len, self.len + data.len())` does not overlap with `data` because
508        // `data` is an external caller-provided slice that cannot alias `self.ptr`.
509        unsafe {
510            ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.as_ptr().add(self.len), data.len());
511            self.len += data.len();
512        }
513
514        self.last_used = Instant::now();
515        Ok(())
516    }
517
518    /// Clear buffer contents but keep allocated memory
519    pub fn clear(&mut self) {
520        self.len = 0;
521        self.last_used = Instant::now();
522    }
523
524    /// Get buffer capacity
525    pub fn capacity(&self) -> usize {
526        self.capacity
527    }
528
529    /// Get current length of valid data
530    pub fn len(&self) -> usize {
531        self.len
532    }
533
534    /// Check if buffer is empty
535    pub fn is_empty(&self) -> bool {
536        self.len == 0
537    }
538
539    /// Get the raw pointer to the buffer
540    pub fn as_ptr(&self) -> *const u8 {
541        self.ptr.as_ptr()
542    }
543
544    /// Get the mutable raw pointer to the buffer  
545    pub fn as_mut_ptr(&mut self) -> *mut u8 {
546        self.ptr.as_ptr()
547    }
548
549    /// Check if buffer is properly aligned
550    ///
551    /// This validates that the buffer pointer has the requested alignment,
552    /// which is critical for SIMD operations.
553    pub fn is_aligned(&self) -> bool {
554        let ptr_addr = self.ptr.as_ptr() as usize;
555        ptr_addr.is_multiple_of(self.alignment)
556    }
557
558    /// Get the actual alignment of the buffer
559    pub fn actual_alignment(&self) -> usize {
560        let ptr_addr = self.ptr.as_ptr() as usize;
561        // Find the highest power of 2 that divides the address
562        if ptr_addr == 0 {
563            return usize::MAX; // null pointer is infinitely aligned
564        }
565
566        // Use trailing zeros to find alignment
567        1 << ptr_addr.trailing_zeros()
568    }
569
570    /// Verify buffer is suitable for specific SIMD instruction set
571    pub fn is_simd_compatible(&self, simd_type: SimdType) -> bool {
572        let required_alignment = match simd_type {
573            SimdType::Sse => 16,
574            SimdType::Avx2 => 32,
575            SimdType::Avx512 => 64,
576            SimdType::Neon => 16,
577        };
578
579        self.actual_alignment() >= required_alignment
580    }
581}
582
583/// SIMD instruction set types
584#[derive(Debug, Clone, Copy, PartialEq, Eq)]
585pub enum SimdType {
586    /// SSE instructions (16-byte alignment)
587    Sse,
588    /// AVX2 instructions (32-byte alignment)  
589    Avx2,
590    /// AVX-512 instructions (64-byte alignment)
591    Avx512,
592    /// ARM NEON instructions (16-byte alignment)
593    Neon,
594}
595
596impl Drop for AlignedBuffer {
597    fn drop(&mut self) {
598        // Use the global SIMD allocator for deallocation
599        let allocator = aligned_allocator();
600
601        // Safety: We allocated this memory with the same layout
602        unsafe {
603            allocator.dealloc_aligned(self.ptr, self.layout);
604        }
605    }
606}
607
608impl Clone for AlignedBuffer {
609    fn clone(&self) -> Self {
610        // Create new buffer with same alignment and capacity
611        let mut new_buffer =
612            Self::new(self.capacity, self.alignment).expect("Failed to clone buffer");
613
614        // Copy data.
615        // SAFETY: `self.ptr` is valid for `self.len` bytes (allocation invariant).
616        // `new_buffer.ptr` is valid for `self.capacity` bytes because `Self::new` was called
617        // with `self.capacity` above. `self.len <= self.capacity` ensures the byte count fits
618        // in both ranges. The two allocations are independent heap regions, so they cannot
619        // overlap.
620        unsafe {
621            ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_buffer.ptr.as_ptr(), self.len);
622            new_buffer.len = self.len;
623        }
624
625        new_buffer
626    }
627}
628
629/// RAII wrapper for pooled buffer that returns buffer to pool on drop
630pub struct PooledBuffer {
631    buffer: Option<AlignedBuffer>,
632    pool: Arc<DashMap<BufferSize, BufferBucket>>,
633    size: BufferSize,
634    max_buffers_per_bucket: usize,
635}
636
637impl PooledBuffer {
638    fn new(
639        buffer: AlignedBuffer,
640        pool: Arc<DashMap<BufferSize, BufferBucket>>,
641        size: BufferSize,
642        max_buffers_per_bucket: usize,
643    ) -> Self {
644        Self {
645            buffer: Some(buffer),
646            pool,
647            size,
648            max_buffers_per_bucket,
649        }
650    }
651
652    /// Get mutable reference to buffer
653    pub fn buffer_mut(&mut self) -> Option<&mut AlignedBuffer> {
654        self.buffer.as_mut()
655    }
656
657    /// Get immutable reference to buffer
658    pub fn buffer(&self) -> Option<&AlignedBuffer> {
659        self.buffer.as_ref()
660    }
661
662    /// Get buffer capacity
663    pub fn capacity(&self) -> usize {
664        self.buffer.as_ref().map(|b| b.capacity()).unwrap_or(0)
665    }
666
667    /// Clear buffer contents
668    pub fn clear(&mut self) {
669        if let Some(buffer) = &mut self.buffer {
670            buffer.clear();
671        }
672    }
673}
674
675impl Drop for PooledBuffer {
676    fn drop(&mut self) {
677        if let Some(mut buffer) = self.buffer.take() {
678            buffer.clear(); // Clear contents before returning to pool
679
680            // Get or create bucket for this buffer size
681            let mut bucket_ref = self.pool.entry(self.size).or_insert_with(|| BufferBucket {
682                buffers: Vec::new(),
683                last_access: Instant::now(),
684            });
685
686            // Only return to pool if we haven't exceeded the per-bucket limit
687            if bucket_ref.buffers.len() < self.max_buffers_per_bucket {
688                bucket_ref.buffers.push(buffer);
689                bucket_ref.last_access = Instant::now();
690            }
691        }
692    }
693}
694
695/// Result of cleanup operation
696#[derive(Debug, Clone)]
697pub struct CleanupStats {
698    /// Number of buffers reclaimed and returned to the system.
699    pub freed_buffers: usize,
700    /// Total memory freed by the cleanup, in bytes.
701    pub freed_memory: usize,
702}
703
704impl PoolConfig {
705    /// Create configuration from security config
706    pub fn from_security_config(security_config: &SecurityConfig) -> Self {
707        Self::from(security_config)
708    }
709
710    /// Create configuration optimized for SIMD operations
711    pub fn simd_optimized() -> Self {
712        let mut config = Self::from(&SecurityConfig::high_throughput());
713        config.simd_alignment = 64; // AVX-512 alignment
714        config
715    }
716
717    /// Create configuration for low-memory environments
718    pub fn low_memory() -> Self {
719        let mut config = Self::from(&SecurityConfig::low_memory());
720        config.track_stats = false; // Reduce overhead
721        config
722    }
723
724    /// Create configuration for development/testing
725    pub fn development() -> Self {
726        Self::from(&SecurityConfig::development())
727    }
728}
729
730impl Default for PoolConfig {
731    fn default() -> Self {
732        let security_config = SecurityConfig::default();
733        Self {
734            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
735            max_total_memory: security_config.buffers.max_total_memory,
736            buffer_ttl: security_config.buffer_ttl(),
737            track_stats: true,
738            simd_alignment: 32, // AVX2 alignment
739            validator: SecurityValidator::new(security_config),
740        }
741    }
742}
743
744impl From<&SecurityConfig> for PoolConfig {
745    fn from(security_config: &SecurityConfig) -> Self {
746        Self {
747            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
748            max_total_memory: security_config.buffers.max_total_memory,
749            buffer_ttl: security_config.buffer_ttl(),
750            track_stats: true,
751            simd_alignment: 32, // AVX2 alignment
752            validator: SecurityValidator::new(security_config.clone()),
753        }
754    }
755}
756
757impl PoolStats {
758    fn new() -> Self {
759        Self {
760            total_allocations: 0,
761            cache_hits: 0,
762            cache_misses: 0,
763            current_memory_usage: 0,
764            peak_memory_usage: 0,
765            cleanup_count: 0,
766        }
767    }
768
769    /// Get cache hit ratio
770    pub fn hit_ratio(&self) -> f64 {
771        if self.total_allocations == 0 {
772            0.0
773        } else {
774            self.cache_hits as f64 / self.total_allocations as f64
775        }
776    }
777
778    /// Get memory efficiency (current/peak ratio)
779    pub fn memory_efficiency(&self) -> f64 {
780        if self.peak_memory_usage == 0 {
781            1.0
782        } else {
783            self.current_memory_usage as f64 / self.peak_memory_usage as f64
784        }
785    }
786}
787
788impl Default for BufferPool {
789    fn default() -> Self {
790        Self::new()
791    }
792}
793
794/// Global buffer pool instance for convenient access
795static GLOBAL_BUFFER_POOL: std::sync::OnceLock<BufferPool> = std::sync::OnceLock::new();
796
797/// Get global buffer pool instance
798pub fn global_buffer_pool() -> &'static BufferPool {
799    GLOBAL_BUFFER_POOL.get_or_init(BufferPool::new)
800}
801
802/// Initialize global buffer pool with custom configuration
803pub fn initialize_global_buffer_pool(config: PoolConfig) -> DomainResult<()> {
804    GLOBAL_BUFFER_POOL
805        .set(BufferPool::with_config(config))
806        .map_err(|_| {
807            DomainError::InternalError("Global buffer pool already initialized".to_string())
808        })?;
809    Ok(())
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815
816    #[test]
817    fn test_buffer_pool_creation() {
818        let pool = BufferPool::new();
819        assert!(pool.stats().is_ok());
820    }
821
822    #[test]
823    fn test_buffer_allocation() {
824        let pool = BufferPool::new();
825        let buffer = pool.get_buffer(BufferSize::Medium);
826        assert!(buffer.is_ok());
827
828        let buffer = buffer.unwrap();
829        assert!(buffer.capacity() >= BufferSize::Medium as usize);
830    }
831
832    #[test]
833    fn test_buffer_reuse() {
834        let pool = BufferPool::new();
835
836        // Allocate and drop buffer
837        {
838            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
839        }
840
841        // Allocate another buffer of same size
842        let _buffer2 = pool.get_buffer(BufferSize::Small).unwrap();
843
844        // Should have cache hit
845        let stats = pool.stats().unwrap();
846        assert!(stats.cache_hits > 0);
847    }
848
849    #[test]
850    fn test_buffer_size_selection() {
851        assert_eq!(BufferSize::for_capacity(500), BufferSize::Small);
852        assert_eq!(BufferSize::for_capacity(2000), BufferSize::Medium);
853        assert_eq!(BufferSize::for_capacity(50000), BufferSize::Large);
854        assert_eq!(BufferSize::for_capacity(100000), BufferSize::XLarge);
855    }
856
857    #[test]
858    fn test_aligned_buffer_creation_guaranteed() {
859        // Test all common SIMD alignments
860        let test_cases = vec![
861            (1024, 16, "SSE alignment"),
862            (2048, 32, "AVX2 alignment"),
863            (4096, 64, "AVX-512 alignment"),
864        ];
865
866        for (capacity, alignment, description) in test_cases {
867            let buffer = AlignedBuffer::new(capacity, alignment).unwrap();
868
869            // Verify pointer alignment
870            let ptr_addr = buffer.as_ptr() as usize;
871            assert_eq!(
872                ptr_addr % alignment,
873                0,
874                "{}: pointer 0x{:x} is not {}-byte aligned",
875                description,
876                ptr_addr,
877                alignment
878            );
879
880            // Verify is_aligned method
881            assert!(
882                buffer.is_aligned(),
883                "{}: is_aligned() returned false for properly aligned buffer",
884                description
885            );
886
887            // Verify capacity
888            assert!(
889                buffer.capacity() >= capacity,
890                "{}: capacity {} is less than requested {}",
891                description,
892                buffer.capacity(),
893                capacity
894            );
895
896            // Verify actual alignment
897            assert!(
898                buffer.actual_alignment() >= alignment,
899                "{}: actual alignment {} is less than requested {}",
900                description,
901                buffer.actual_alignment(),
902                alignment
903            );
904        }
905    }
906
907    #[test]
908    fn test_buffer_operations() {
909        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
910
911        // Test initial state
912        assert_eq!(buffer.len(), 0);
913        assert!(buffer.is_empty());
914        assert_eq!(buffer.capacity(), 1024);
915
916        // Test extend_from_slice
917        let data = b"Hello, SIMD World!";
918        buffer.extend_from_slice(data).unwrap();
919        assert_eq!(buffer.len(), data.len());
920        assert_eq!(buffer.as_slice(), data);
921
922        // Test clear
923        buffer.clear();
924        assert_eq!(buffer.len(), 0);
925        assert!(buffer.is_empty());
926        assert_eq!(buffer.capacity(), 1024); // Capacity should remain
927
928        // Test unsafe set_len
929        unsafe {
930            // Write some data directly
931            let slice = buffer.as_mut_capacity_slice();
932            slice[0..5].copy_from_slice(b"SIMD!");
933            buffer.set_len(5);
934        }
935        assert_eq!(buffer.len(), 5);
936        assert_eq!(&buffer.as_slice()[0..5], b"SIMD!");
937    }
938
939    #[test]
940    fn test_buffer_reserve() {
941        let mut buffer = AlignedBuffer::new(64, 32).unwrap();
942        let _initial_alignment = buffer.actual_alignment();
943
944        // Set some length first
945        unsafe {
946            buffer.set_len(32);
947        }
948
949        // Reserve additional space - should need capacity for len + additional
950        buffer.reserve(256).unwrap();
951        assert!(
952            buffer.capacity() >= 32 + 256,
953            "Expected capacity >= {}, got {}",
954            32 + 256,
955            buffer.capacity()
956        );
957
958        // Alignment should be preserved after reallocation
959        assert!(
960            buffer.actual_alignment() >= 32,
961            "Alignment not preserved after reserve"
962        );
963        assert!(buffer.is_aligned());
964
965        // Test that data is preserved during reallocation
966        buffer.extend_from_slice(b"test data").unwrap();
967        let old_data = buffer.as_slice().to_vec();
968
969        buffer.reserve(1024).unwrap();
970        assert_eq!(buffer.as_slice(), &old_data[..]);
971    }
972
973    #[test]
974    fn test_buffer_clone() {
975        let mut original = AlignedBuffer::new(512, 64).unwrap();
976        original.extend_from_slice(b"Original data").unwrap();
977
978        let cloned = original.clone();
979
980        // Verify clone has same properties
981        assert_eq!(cloned.len(), original.len());
982        assert_eq!(cloned.capacity(), original.capacity());
983        assert_eq!(cloned.alignment, original.alignment);
984        assert_eq!(cloned.as_slice(), original.as_slice());
985
986        // Verify clone has different memory location
987        assert_ne!(cloned.as_ptr(), original.as_ptr());
988
989        // Verify clone is also properly aligned
990        assert!(cloned.is_aligned());
991        assert!(cloned.actual_alignment() >= 64);
992    }
993
994    #[test]
995    fn test_alignment_validation() {
996        // Test valid power-of-2 alignments
997        let valid_alignments = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096];
998
999        for &alignment in &valid_alignments {
1000            let result = AlignedBuffer::new(1024, alignment);
1001            assert!(result.is_ok(), "Alignment {} should be valid", alignment);
1002
1003            let buffer = result.unwrap();
1004            assert!(
1005                buffer.is_aligned(),
1006                "Buffer with alignment {} should be aligned",
1007                alignment
1008            );
1009        }
1010
1011        // Test invalid non-power-of-2 alignments
1012        let invalid_alignments = [3, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 17, 31, 33, 63, 65];
1013
1014        for &alignment in &invalid_alignments {
1015            let result = AlignedBuffer::new(1024, alignment);
1016            assert!(result.is_err(), "Alignment {} should be invalid", alignment);
1017        }
1018
1019        // Test too large alignment
1020        assert!(AlignedBuffer::new(1024, 8192).is_err());
1021    }
1022
1023    #[test]
1024    fn test_actual_alignment_calculation() {
1025        // Create buffers with different alignments and verify actual_alignment()
1026        for &requested_align in &[16, 32, 64] {
1027            let buffer = AlignedBuffer::new(1024, requested_align).unwrap();
1028            let actual = buffer.actual_alignment();
1029
1030            assert!(
1031                actual >= requested_align,
1032                "Actual alignment {} is less than requested {}",
1033                actual,
1034                requested_align
1035            );
1036
1037            // actual_alignment should be a power of 2
1038            assert!(
1039                actual.is_power_of_two(),
1040                "Actual alignment {} is not a power of 2",
1041                actual
1042            );
1043        }
1044    }
1045
1046    #[test]
1047    fn test_simd_compatibility_check() {
1048        // SSE buffer should be compatible with SSE but might not be with AVX-512
1049        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1050        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1051        assert!(sse_buffer.is_simd_compatible(SimdType::Neon)); // Same alignment as SSE
1052
1053        // AVX-512 buffer should be compatible with all instruction sets
1054        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1055        assert!(avx512_buffer.is_simd_compatible(SimdType::Sse));
1056        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx2));
1057        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1058        assert!(avx512_buffer.is_simd_compatible(SimdType::Neon));
1059    }
1060
1061    #[test]
1062    fn test_zero_copy_verification() {
1063        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
1064
1065        // Get raw pointer before modification
1066        let ptr_before = buffer.as_ptr();
1067
1068        // Perform various operations that should NOT move the buffer
1069        buffer.clear();
1070        buffer.extend_from_slice(b"test").unwrap();
1071        unsafe {
1072            buffer.set_len(2);
1073        }
1074
1075        // Pointer should remain the same (zero-copy)
1076        assert_eq!(
1077            ptr_before,
1078            buffer.as_ptr(),
1079            "Buffer was moved during operations (not zero-copy)"
1080        );
1081
1082        // Only reserve should potentially change the pointer
1083        buffer.reserve(2048).unwrap();
1084        // After reserve, pointer might change but should still be aligned
1085        assert!(buffer.is_aligned());
1086    }
1087
1088    #[test]
1089    fn test_pool_cleanup() {
1090        let config = PoolConfig {
1091            buffer_ttl: Duration::from_millis(1),
1092            ..Default::default()
1093        };
1094        let pool = BufferPool::with_config(config);
1095
1096        // Allocate and drop buffer
1097        {
1098            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1099        }
1100
1101        // Wait for TTL
1102        std::thread::sleep(Duration::from_millis(10));
1103
1104        // Cleanup should free the buffer
1105        let cleanup_stats = pool.cleanup().unwrap();
1106        assert!(cleanup_stats.freed_buffers > 0);
1107    }
1108
1109    #[test]
1110    fn test_global_buffer_pool() {
1111        let pool = global_buffer_pool();
1112        let buffer = pool.get_buffer(BufferSize::Medium);
1113        assert!(buffer.is_ok());
1114    }
1115
1116    #[test]
1117    fn test_memory_limit_enforcement() {
1118        let config = PoolConfig {
1119            max_total_memory: 1024, // Very small limit
1120            max_buffers_per_bucket: 10,
1121            ..Default::default()
1122        };
1123        let pool = BufferPool::with_config(config);
1124
1125        // Create a buffer that exceeds the memory limit
1126        let result = pool.get_buffer(BufferSize::Medium); // 8KB > 1KB limit
1127
1128        assert!(result.is_err());
1129
1130        if let Err(e) = result {
1131            assert!(e.to_string().contains("memory limit"));
1132        }
1133    }
1134
1135    #[test]
1136    fn test_per_bucket_limit_enforcement() {
1137        let config = PoolConfig {
1138            max_buffers_per_bucket: 2,          // Very small limit
1139            max_total_memory: 10 * 1024 * 1024, // Generous memory limit
1140            ..Default::default()
1141        };
1142        let pool = BufferPool::with_config(config);
1143
1144        // Allocate and drop buffers to fill the bucket
1145        for _ in 0..3 {
1146            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1147            // Buffer goes back to pool on drop
1148        }
1149
1150        // Only 2 buffers should be retained in the pool
1151        let stats = pool.stats().unwrap();
1152        assert!(stats.cache_hits <= 2, "Too many buffers retained in bucket");
1153    }
1154
1155    #[test]
1156    fn test_buffer_size_validation() {
1157        let pool = BufferPool::new();
1158
1159        // All standard buffer sizes should be valid
1160        for size in BufferSize::all_sizes() {
1161            let result = pool.get_buffer(*size);
1162            assert!(result.is_ok(), "Buffer size {:?} should be valid", size);
1163        }
1164    }
1165
1166    #[test]
1167    fn test_memory_safety() {
1168        // Test that dropping a buffer properly deallocates memory
1169        // This test would fail under valgrind/ASAN if there's a memory leak
1170        for _ in 0..100 {
1171            let buffer = AlignedBuffer::new(1024, 64).unwrap();
1172            drop(buffer);
1173        }
1174
1175        // Test clone and drop
1176        for _ in 0..100 {
1177            let buffer = AlignedBuffer::new(512, 32).unwrap();
1178            let cloned = buffer.clone();
1179            drop(buffer);
1180            drop(cloned);
1181        }
1182    }
1183
1184    #[test]
1185    fn test_simd_specific_constructors() {
1186        // Test SSE alignment (16 bytes)
1187        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1188        assert!(sse_buffer.is_aligned());
1189        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1190        assert_eq!(sse_buffer.alignment, 16);
1191
1192        // Test AVX2 alignment (32 bytes)
1193        let avx2_buffer = AlignedBuffer::new_avx2(1024).unwrap();
1194        assert!(avx2_buffer.is_aligned());
1195        assert!(avx2_buffer.is_simd_compatible(SimdType::Avx2));
1196        assert_eq!(avx2_buffer.alignment, 32);
1197
1198        // Test AVX-512 alignment (64 bytes)
1199        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1200        assert!(avx512_buffer.is_aligned());
1201        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1202        assert_eq!(avx512_buffer.alignment, 64);
1203    }
1204
1205    #[test]
1206    fn test_simd_alignment_compatibility() {
1207        let buffer_64 = AlignedBuffer::new(1024, 64).unwrap();
1208
1209        // 64-byte aligned buffer should be compatible with all SIMD types
1210        assert!(buffer_64.is_simd_compatible(SimdType::Sse)); // 16-byte requirement
1211        assert!(buffer_64.is_simd_compatible(SimdType::Avx2)); // 32-byte requirement
1212        assert!(buffer_64.is_simd_compatible(SimdType::Avx512)); // 64-byte requirement
1213        assert!(buffer_64.is_simd_compatible(SimdType::Neon)); // 16-byte requirement
1214
1215        // Note: We can't easily test incompatible alignments since the allocator
1216        // might provide better alignment than requested for performance reasons.
1217        // Instead, test the requested alignment vs required alignment directly.
1218        #[allow(clippy::assertions_on_constants)]
1219        {
1220            assert!(64 >= 16); // SSE compatible
1221            assert!(64 >= 32); // AVX2 compatible
1222            assert!(64 >= 64); // AVX512 compatible
1223            assert!(64 >= 16); // NEON compatible
1224        }
1225
1226        let buffer_16 = AlignedBuffer::new(1024, 16).unwrap();
1227
1228        // Test that buffer reports correct requested alignment
1229        assert_eq!(buffer_16.alignment, 16);
1230
1231        // 16-byte aligned buffer should be compatible with SSE and NEON
1232        assert!(buffer_16.is_simd_compatible(SimdType::Sse));
1233        assert!(buffer_16.is_simd_compatible(SimdType::Neon));
1234
1235        // Note: actual_alignment() might be higher than 16 due to allocator behavior
1236        // so we can't reliably test incompatibility. Instead verify logic:
1237        #[allow(clippy::assertions_on_constants)]
1238        {
1239            assert!(16 >= 16); // SSE requirement met
1240            assert!(16 < 32); // AVX2 requirement NOT met by requested alignment
1241            assert!(16 < 64); // AVX512 requirement NOT met by requested alignment
1242        }
1243    }
1244
1245    #[test]
1246    fn test_actual_alignment_detection() {
1247        let buffer = AlignedBuffer::new(1024, 64).unwrap();
1248
1249        let actual_alignment = buffer.actual_alignment();
1250        assert!(
1251            actual_alignment >= 64,
1252            "Buffer has actual alignment of {}, expected at least 64",
1253            actual_alignment
1254        );
1255
1256        // The actual alignment should be a power of 2 and >= requested alignment
1257        assert!(actual_alignment.is_power_of_two());
1258        assert!(actual_alignment >= buffer.alignment);
1259    }
1260
1261    #[test]
1262    fn test_simd_pool_configuration() {
1263        // Test pool with high SIMD alignment requirement
1264        let config = PoolConfig {
1265            simd_alignment: 64, // AVX-512 alignment
1266            ..Default::default()
1267        };
1268        let pool = BufferPool::with_config(config);
1269
1270        let buffer = pool.get_buffer(BufferSize::Medium).unwrap();
1271        assert!(buffer.buffer().unwrap().is_aligned());
1272        assert!(
1273            buffer
1274                .buffer()
1275                .unwrap()
1276                .is_simd_compatible(SimdType::Avx512)
1277        );
1278    }
1279
1280    #[test]
1281    fn test_alignment_edge_cases() {
1282        // Test minimum alignment
1283        let buffer_min = AlignedBuffer::new(64, 1).unwrap();
1284        assert!(buffer_min.is_aligned());
1285        assert!(buffer_min.alignment >= mem::align_of::<usize>());
1286
1287        // Test power-of-2 validation
1288        assert!(AlignedBuffer::new(1024, 3).is_err());
1289        assert!(AlignedBuffer::new(1024, 17).is_err());
1290        assert!(AlignedBuffer::new(1024, 33).is_err());
1291
1292        // Test maximum alignment limit
1293        assert!(AlignedBuffer::new(1024, 8192).is_err());
1294    }
1295
1296    #[test]
1297    fn test_simd_performance_oriented_allocation() {
1298        // Test that allocation pattern is suitable for high-performance SIMD
1299        let buffer = AlignedBuffer::new_avx512(4096).unwrap();
1300
1301        // Verify the buffer can be used for actual SIMD-like operations
1302        let slice = unsafe { std::slice::from_raw_parts_mut(buffer.ptr.as_ptr(), buffer.capacity) };
1303
1304        // Fill with test pattern
1305        for (i, byte) in slice.iter_mut().enumerate() {
1306            *byte = (i % 256) as u8;
1307        }
1308
1309        // Verify alignment is maintained through operations
1310        assert!(buffer.is_aligned());
1311        assert_eq!(slice[0], 0);
1312        assert_eq!(slice[255], 255);
1313        assert_eq!(slice[256], 0);
1314    }
1315}