Skip to main content

pjson_rs/parser/
buffer_pool.rs

1//! Buffer pool system for zero-copy parsing with memory management
2//!
3//! This module provides a memory pool system to minimize allocations during
4//! JSON parsing, with support for different buffer sizes and reuse strategies.
5
6use crate::{
7    config::SecurityConfig,
8    domain::{DomainError, DomainResult},
9    parser::allocator::global_allocator,
10    security::SecurityValidator,
11};
12use dashmap::DashMap;
13use std::{
14    alloc::Layout,
15    mem,
16    ptr::{self, NonNull},
17    slice,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22/// Buffer pool that manages reusable byte buffers for parsing
23#[derive(Debug)]
24pub struct BufferPool {
25    pools: Arc<DashMap<BufferSize, BufferBucket>>,
26    config: PoolConfig,
27    stats: Arc<parking_lot::Mutex<PoolStats>>, // Keep stats under mutex as it's written less frequently
28}
29
30/// Configuration for buffer pool behavior
31#[derive(Debug, Clone)]
32pub struct PoolConfig {
33    /// Maximum number of buffers per size bucket
34    pub max_buffers_per_bucket: usize,
35    /// Maximum total memory usage in bytes
36    pub max_total_memory: usize,
37    /// How long to keep unused buffers before cleanup
38    pub buffer_ttl: Duration,
39    /// Enable/disable pool statistics tracking
40    pub track_stats: bool,
41    /// Alignment for SIMD operations (typically 32 or 64 bytes)
42    pub simd_alignment: usize,
43    /// Security validator for buffer validation
44    pub validator: SecurityValidator,
45}
46
47/// Standard buffer sizes for different parsing scenarios
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
49pub enum BufferSize {
50    /// Small buffers for short JSON strings (1KB)
51    Small = 1024,
52    /// Medium buffers for typical API responses (8KB)  
53    Medium = 8192,
54    /// Large buffers for complex documents (64KB)
55    Large = 65536,
56    /// Extra large buffers for bulk data (512KB)
57    XLarge = 524288,
58    /// Huge buffers for massive documents (4MB)
59    Huge = 4194304,
60}
61
62/// A bucket containing buffers of the same size
63#[derive(Debug)]
64struct BufferBucket {
65    buffers: Vec<AlignedBuffer>,
66    #[allow(dead_code)] // Future: size-based pool management
67    size: BufferSize,
68    last_access: Instant,
69}
70
71/// SIMD-aligned buffer with metadata
72///
73/// This buffer guarantees proper alignment for SIMD operations using direct memory allocation.
74/// It supports SSE (16-byte), AVX2 (32-byte), and AVX-512 (64-byte) alignments.
75pub struct AlignedBuffer {
76    /// Raw pointer to aligned memory
77    ptr: NonNull<u8>,
78    /// Current length of valid data
79    len: usize,
80    /// Total capacity in bytes
81    capacity: usize,
82    /// Memory alignment requirement
83    alignment: usize,
84    /// Layout used for allocation (needed for deallocation)
85    layout: Layout,
86    /// Creation timestamp
87    created_at: Instant,
88    /// Last usage timestamp
89    last_used: Instant,
90}
91
92// Safety: AlignedBuffer can be safely sent between threads
93unsafe impl Send for AlignedBuffer {}
94
95// Safety: AlignedBuffer can be safely shared between threads (no interior mutability)
96unsafe impl Sync for AlignedBuffer {}
97
98impl std::fmt::Debug for AlignedBuffer {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("AlignedBuffer")
101            .field("ptr", &format_args!("0x{:x}", self.ptr.as_ptr() as usize))
102            .field("len", &self.len)
103            .field("capacity", &self.capacity)
104            .field("alignment", &self.alignment)
105            .field("is_aligned", &self.is_aligned())
106            .field("created_at", &self.created_at)
107            .field("last_used", &self.last_used)
108            .finish()
109    }
110}
111
112/// Statistics about buffer pool usage
113#[derive(Debug, Clone)]
114pub struct PoolStats {
115    /// Total allocations requested
116    pub total_allocations: u64,
117    /// Cache hits (buffer reused)
118    pub cache_hits: u64,
119    /// Cache misses (new buffer allocated)
120    pub cache_misses: u64,
121    /// Current memory usage in bytes
122    pub current_memory_usage: usize,
123    /// Peak memory usage in bytes
124    pub peak_memory_usage: usize,
125    /// Number of cleanup operations performed
126    pub cleanup_count: u64,
127}
128
129impl BufferPool {
130    /// Create new buffer pool with default configuration
131    pub fn new() -> Self {
132        Self::with_config(PoolConfig::default())
133    }
134
135    /// Create buffer pool with custom configuration
136    pub fn with_config(config: PoolConfig) -> Self {
137        Self {
138            pools: Arc::new(DashMap::new()),
139            config,
140            stats: Arc::new(parking_lot::Mutex::new(PoolStats::new())),
141        }
142    }
143
144    /// Create buffer pool with security configuration
145    pub fn with_security_config(security_config: SecurityConfig) -> Self {
146        Self::with_config(PoolConfig::from(&security_config))
147    }
148
149    /// Get buffer of specified size, reusing if available
150    pub fn get_buffer(&self, size: BufferSize) -> DomainResult<PooledBuffer> {
151        // Security validation: check buffer size
152        self.config
153            .validator
154            .validate_buffer_size(size as usize)
155            .map_err(|e| DomainError::SecurityViolation(e.to_string()))?;
156
157        // Check if we would exceed total memory limit
158        let current_usage = self.current_memory_usage().unwrap_or(0);
159        if current_usage + (size as usize) > self.config.max_total_memory {
160            return Err(DomainError::ResourceExhausted(format!(
161                "Adding buffer of size {} would exceed memory limit: current={}, limit={}",
162                size as usize, current_usage, self.config.max_total_memory
163            )));
164        }
165
166        if self.config.track_stats {
167            self.increment_allocations();
168        }
169
170        // Try to get a buffer from existing bucket
171        if let Some(mut bucket_ref) = self.pools.get_mut(&size)
172            && let Some(mut buffer) = bucket_ref.buffers.pop()
173        {
174            buffer.last_used = Instant::now();
175            bucket_ref.last_access = Instant::now();
176
177            if self.config.track_stats {
178                self.increment_cache_hits();
179            }
180
181            return Ok(PooledBuffer::new(
182                buffer,
183                Arc::clone(&self.pools),
184                size,
185                self.config.max_buffers_per_bucket,
186            ));
187        }
188
189        // No buffer available, create new one
190        if self.config.track_stats {
191            self.increment_cache_misses();
192        }
193
194        let buffer = AlignedBuffer::new(size as usize, self.config.simd_alignment)?;
195        Ok(PooledBuffer::new(
196            buffer,
197            Arc::clone(&self.pools),
198            size,
199            self.config.max_buffers_per_bucket,
200        ))
201    }
202
203    /// Get buffer with at least the specified capacity
204    pub fn get_buffer_with_capacity(&self, min_capacity: usize) -> DomainResult<PooledBuffer> {
205        let size = BufferSize::for_capacity(min_capacity);
206        self.get_buffer(size)
207    }
208
209    /// Perform cleanup of old unused buffers
210    pub fn cleanup(&self) -> DomainResult<CleanupStats> {
211        let now = Instant::now();
212        let mut freed_buffers = 0;
213        let mut freed_memory = 0;
214
215        // DashMap doesn't have retain, so we collect keys to remove
216        let mut keys_to_remove = Vec::new();
217
218        for mut entry in self.pools.iter_mut() {
219            let bucket = entry.value_mut();
220            let old_count = bucket.buffers.len();
221
222            bucket.buffers.retain(|buffer| {
223                let age = now.duration_since(buffer.last_used);
224                if age > self.config.buffer_ttl {
225                    freed_memory += buffer.capacity;
226                    false
227                } else {
228                    true
229                }
230            });
231
232            freed_buffers += old_count - bucket.buffers.len();
233
234            // Mark bucket for removal if empty and not recently accessed
235            if bucket.buffers.is_empty()
236                && now.duration_since(bucket.last_access) >= self.config.buffer_ttl
237            {
238                keys_to_remove.push(*entry.key());
239            }
240        }
241
242        // Remove empty buckets
243        for key in keys_to_remove {
244            self.pools.remove(&key);
245        }
246
247        if self.config.track_stats {
248            self.increment_cleanup_count();
249            self.update_current_memory_usage(-(freed_memory as i64));
250        }
251
252        Ok(CleanupStats {
253            freed_buffers,
254            freed_memory,
255        })
256    }
257
258    /// Get current pool statistics
259    pub fn stats(&self) -> DomainResult<PoolStats> {
260        let stats = self.stats.lock();
261        Ok(stats.clone())
262    }
263
264    /// Get current memory usage across all pools
265    pub fn current_memory_usage(&self) -> DomainResult<usize> {
266        use rayon::prelude::*;
267
268        let usage = self
269            .pools
270            .iter()
271            .par_bridge()
272            .map(|entry| {
273                entry
274                    .value()
275                    .buffers
276                    .par_iter()
277                    .map(|b| b.capacity)
278                    .sum::<usize>()
279            })
280            .sum();
281
282        Ok(usage)
283    }
284
285    // Private statistics methods
286
287    fn increment_allocations(&self) {
288        let mut stats = self.stats.lock();
289        stats.total_allocations += 1;
290    }
291
292    fn increment_cache_hits(&self) {
293        let mut stats = self.stats.lock();
294        stats.cache_hits += 1;
295    }
296
297    fn increment_cache_misses(&self) {
298        let mut stats = self.stats.lock();
299        stats.cache_misses += 1;
300    }
301
302    fn increment_cleanup_count(&self) {
303        let mut stats = self.stats.lock();
304        stats.cleanup_count += 1;
305    }
306
307    fn update_current_memory_usage(&self, delta: i64) {
308        let mut stats = self.stats.lock();
309        stats.current_memory_usage = (stats.current_memory_usage as i64 + delta).max(0) as usize;
310        stats.peak_memory_usage = stats.peak_memory_usage.max(stats.current_memory_usage);
311    }
312}
313
314impl BufferSize {
315    /// Get appropriate buffer size for given capacity
316    pub fn for_capacity(capacity: usize) -> Self {
317        match capacity {
318            0..=1024 => BufferSize::Small,
319            1025..=8192 => BufferSize::Medium,
320            8193..=65536 => BufferSize::Large,
321            65537..=524288 => BufferSize::XLarge,
322            _ => BufferSize::Huge,
323        }
324    }
325
326    /// Get all available buffer sizes in order
327    pub fn all_sizes() -> &'static [BufferSize] {
328        &[
329            BufferSize::Small,
330            BufferSize::Medium,
331            BufferSize::Large,
332            BufferSize::XLarge,
333            BufferSize::Huge,
334        ]
335    }
336}
337
338impl AlignedBuffer {
339    /// Create new aligned buffer with guaranteed SIMD alignment
340    ///
341    /// # Arguments
342    /// * `capacity` - Minimum capacity in bytes
343    /// * `alignment` - Required alignment (must be power of 2)
344    ///
345    /// # Safety
346    /// This function uses unsafe code to allocate aligned memory.
347    /// The memory is properly tracked and will be deallocated on drop.
348    pub fn new(capacity: usize, alignment: usize) -> DomainResult<Self> {
349        // Validate alignment is power of 2 and reasonable
350        if !alignment.is_power_of_two() {
351            return Err(DomainError::InvalidInput(format!(
352                "Alignment {} is not a power of 2",
353                alignment
354            )));
355        }
356
357        // Validate alignment is not too large (max 4096 bytes for page alignment)
358        if alignment > 4096 {
359            return Err(DomainError::InvalidInput(format!(
360                "Alignment {} exceeds maximum of 4096",
361                alignment
362            )));
363        }
364
365        // Minimum alignment should be at least size of usize for proper alignment
366        let alignment = alignment.max(mem::align_of::<usize>());
367
368        // Align capacity to SIMD boundaries
369        let aligned_capacity = (capacity + alignment - 1) & !(alignment - 1);
370
371        // Ensure minimum capacity for safety
372        let aligned_capacity = aligned_capacity.max(alignment);
373
374        // Create layout for allocation (kept for Drop implementation)
375        let layout = Layout::from_size_align(aligned_capacity, alignment).map_err(|e| {
376            DomainError::InvalidInput(format!(
377                "Invalid layout: capacity={}, alignment={}, error={}",
378                aligned_capacity, alignment, e
379            ))
380        })?;
381
382        // Use global SIMD allocator for better performance
383        let allocator = global_allocator();
384
385        // Allocate aligned memory using the appropriate allocator backend
386        // Safety: alignment has been validated above
387        let ptr = unsafe { allocator.alloc_aligned(aligned_capacity, alignment)? };
388
389        let now = Instant::now();
390        Ok(Self {
391            ptr,
392            len: 0,
393            capacity: aligned_capacity,
394            alignment,
395            layout,
396            created_at: now,
397            last_used: now,
398        })
399    }
400
401    /// Create an aligned buffer with specific SIMD level
402    pub fn new_sse(capacity: usize) -> DomainResult<Self> {
403        Self::new(capacity, 16) // SSE requires 16-byte alignment
404    }
405
406    /// Create an aligned buffer for AVX2 operations
407    pub fn new_avx2(capacity: usize) -> DomainResult<Self> {
408        Self::new(capacity, 32) // AVX2 requires 32-byte alignment
409    }
410
411    /// Create an aligned buffer for AVX-512 operations
412    pub fn new_avx512(capacity: usize) -> DomainResult<Self> {
413        Self::new(capacity, 64) // AVX-512 requires 64-byte alignment
414    }
415
416    /// Get mutable slice to buffer data
417    pub fn as_mut_slice(&mut self) -> &mut [u8] {
418        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
419    }
420
421    /// Get immutable slice to buffer data
422    pub fn as_slice(&self) -> &[u8] {
423        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
424    }
425
426    /// Get a mutable slice with full capacity
427    pub fn as_mut_capacity_slice(&mut self) -> &mut [u8] {
428        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.capacity) }
429    }
430
431    /// Set the length of valid data
432    ///
433    /// # Safety
434    /// Caller must ensure that `new_len` bytes are initialized
435    pub unsafe fn set_len(&mut self, new_len: usize) {
436        debug_assert!(
437            new_len <= self.capacity,
438            "new_len {} exceeds capacity {}",
439            new_len,
440            self.capacity
441        );
442        self.len = new_len;
443        self.last_used = Instant::now();
444    }
445
446    /// Reserve additional capacity
447    pub fn reserve(&mut self, additional: usize) -> DomainResult<()> {
448        let new_capacity = self
449            .len
450            .checked_add(additional)
451            .ok_or_else(|| DomainError::InvalidInput("Capacity overflow".to_string()))?;
452
453        if new_capacity <= self.capacity {
454            return Ok(());
455        }
456
457        // Align new capacity
458        let aligned_capacity = (new_capacity + self.alignment - 1) & !(self.alignment - 1);
459
460        // Use global SIMD allocator for reallocation
461        let allocator = global_allocator();
462
463        // Reallocate using the allocator (which will handle data copying)
464        let new_ptr =
465            unsafe { allocator.realloc_aligned(self.ptr, self.layout, aligned_capacity)? };
466
467        // Update layout for the new size
468        let new_layout = Layout::from_size_align(aligned_capacity, self.alignment)
469            .map_err(|e| DomainError::InvalidInput(format!("Invalid layout: {}", e)))?;
470
471        self.ptr = new_ptr;
472        self.capacity = aligned_capacity;
473        self.layout = new_layout;
474        self.last_used = Instant::now();
475
476        Ok(())
477    }
478
479    /// Push bytes to the buffer
480    pub fn extend_from_slice(&mut self, data: &[u8]) -> DomainResult<()> {
481        let required_capacity = self
482            .len
483            .checked_add(data.len())
484            .ok_or_else(|| DomainError::InvalidInput("Length overflow".to_string()))?;
485
486        if required_capacity > self.capacity {
487            self.reserve(data.len())?;
488        }
489
490        unsafe {
491            ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.as_ptr().add(self.len), data.len());
492            self.len += data.len();
493        }
494
495        self.last_used = Instant::now();
496        Ok(())
497    }
498
499    /// Clear buffer contents but keep allocated memory
500    pub fn clear(&mut self) {
501        self.len = 0;
502        self.last_used = Instant::now();
503    }
504
505    /// Get buffer capacity
506    pub fn capacity(&self) -> usize {
507        self.capacity
508    }
509
510    /// Get current length of valid data
511    pub fn len(&self) -> usize {
512        self.len
513    }
514
515    /// Check if buffer is empty
516    pub fn is_empty(&self) -> bool {
517        self.len == 0
518    }
519
520    /// Get the raw pointer to the buffer
521    pub fn as_ptr(&self) -> *const u8 {
522        self.ptr.as_ptr()
523    }
524
525    /// Get the mutable raw pointer to the buffer  
526    pub fn as_mut_ptr(&mut self) -> *mut u8 {
527        self.ptr.as_ptr()
528    }
529
530    /// Check if buffer is properly aligned
531    ///
532    /// This validates that the buffer pointer has the requested alignment,
533    /// which is critical for SIMD operations.
534    pub fn is_aligned(&self) -> bool {
535        let ptr_addr = self.ptr.as_ptr() as usize;
536        ptr_addr.is_multiple_of(self.alignment)
537    }
538
539    /// Get the actual alignment of the buffer
540    pub fn actual_alignment(&self) -> usize {
541        let ptr_addr = self.ptr.as_ptr() as usize;
542        // Find the highest power of 2 that divides the address
543        if ptr_addr == 0 {
544            return usize::MAX; // null pointer is infinitely aligned
545        }
546
547        // Use trailing zeros to find alignment
548        1 << ptr_addr.trailing_zeros()
549    }
550
551    /// Verify buffer is suitable for specific SIMD instruction set
552    pub fn is_simd_compatible(&self, simd_type: SimdType) -> bool {
553        let required_alignment = match simd_type {
554            SimdType::Sse => 16,
555            SimdType::Avx2 => 32,
556            SimdType::Avx512 => 64,
557            SimdType::Neon => 16,
558        };
559
560        self.actual_alignment() >= required_alignment
561    }
562}
563
564/// SIMD instruction set types
565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
566pub enum SimdType {
567    /// SSE instructions (16-byte alignment)
568    Sse,
569    /// AVX2 instructions (32-byte alignment)  
570    Avx2,
571    /// AVX-512 instructions (64-byte alignment)
572    Avx512,
573    /// ARM NEON instructions (16-byte alignment)
574    Neon,
575}
576
577impl Drop for AlignedBuffer {
578    fn drop(&mut self) {
579        // Use the global SIMD allocator for deallocation
580        let allocator = global_allocator();
581
582        // Safety: We allocated this memory with the same layout
583        unsafe {
584            allocator.dealloc_aligned(self.ptr, self.layout);
585        }
586    }
587}
588
589impl Clone for AlignedBuffer {
590    fn clone(&self) -> Self {
591        // Create new buffer with same alignment and capacity
592        let mut new_buffer =
593            Self::new(self.capacity, self.alignment).expect("Failed to clone buffer");
594
595        // Copy data
596        unsafe {
597            ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_buffer.ptr.as_ptr(), self.len);
598            new_buffer.len = self.len;
599        }
600
601        new_buffer
602    }
603}
604
605/// RAII wrapper for pooled buffer that returns buffer to pool on drop
606pub struct PooledBuffer {
607    buffer: Option<AlignedBuffer>,
608    pool: Arc<DashMap<BufferSize, BufferBucket>>,
609    size: BufferSize,
610    max_buffers_per_bucket: usize,
611}
612
613impl PooledBuffer {
614    fn new(
615        buffer: AlignedBuffer,
616        pool: Arc<DashMap<BufferSize, BufferBucket>>,
617        size: BufferSize,
618        max_buffers_per_bucket: usize,
619    ) -> Self {
620        Self {
621            buffer: Some(buffer),
622            pool,
623            size,
624            max_buffers_per_bucket,
625        }
626    }
627
628    /// Get mutable reference to buffer
629    pub fn buffer_mut(&mut self) -> Option<&mut AlignedBuffer> {
630        self.buffer.as_mut()
631    }
632
633    /// Get immutable reference to buffer
634    pub fn buffer(&self) -> Option<&AlignedBuffer> {
635        self.buffer.as_ref()
636    }
637
638    /// Get buffer capacity
639    pub fn capacity(&self) -> usize {
640        self.buffer.as_ref().map(|b| b.capacity()).unwrap_or(0)
641    }
642
643    /// Clear buffer contents
644    pub fn clear(&mut self) {
645        if let Some(buffer) = &mut self.buffer {
646            buffer.clear();
647        }
648    }
649}
650
651impl Drop for PooledBuffer {
652    fn drop(&mut self) {
653        if let Some(mut buffer) = self.buffer.take() {
654            buffer.clear(); // Clear contents before returning to pool
655
656            // Get or create bucket for this buffer size
657            let mut bucket_ref = self.pool.entry(self.size).or_insert_with(|| BufferBucket {
658                buffers: Vec::new(),
659                size: self.size,
660                last_access: Instant::now(),
661            });
662
663            // Only return to pool if we haven't exceeded the per-bucket limit
664            if bucket_ref.buffers.len() < self.max_buffers_per_bucket {
665                bucket_ref.buffers.push(buffer);
666                bucket_ref.last_access = Instant::now();
667            }
668        }
669    }
670}
671
672/// Result of cleanup operation
673#[derive(Debug, Clone)]
674pub struct CleanupStats {
675    pub freed_buffers: usize,
676    pub freed_memory: usize,
677}
678
679impl PoolConfig {
680    /// Create configuration from security config
681    pub fn from_security_config(security_config: &SecurityConfig) -> Self {
682        Self::from(security_config)
683    }
684
685    /// Create configuration optimized for SIMD operations
686    pub fn simd_optimized() -> Self {
687        let mut config = Self::from(&SecurityConfig::high_throughput());
688        config.simd_alignment = 64; // AVX-512 alignment
689        config
690    }
691
692    /// Create configuration for low-memory environments
693    pub fn low_memory() -> Self {
694        let mut config = Self::from(&SecurityConfig::low_memory());
695        config.track_stats = false; // Reduce overhead
696        config
697    }
698
699    /// Create configuration for development/testing
700    pub fn development() -> Self {
701        Self::from(&SecurityConfig::development())
702    }
703}
704
705impl Default for PoolConfig {
706    fn default() -> Self {
707        let security_config = SecurityConfig::default();
708        Self {
709            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
710            max_total_memory: security_config.buffers.max_total_memory,
711            buffer_ttl: security_config.buffer_ttl(),
712            track_stats: true,
713            simd_alignment: 32, // AVX2 alignment
714            validator: SecurityValidator::new(security_config),
715        }
716    }
717}
718
719impl From<&SecurityConfig> for PoolConfig {
720    fn from(security_config: &SecurityConfig) -> Self {
721        Self {
722            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
723            max_total_memory: security_config.buffers.max_total_memory,
724            buffer_ttl: security_config.buffer_ttl(),
725            track_stats: true,
726            simd_alignment: 32, // AVX2 alignment
727            validator: SecurityValidator::new(security_config.clone()),
728        }
729    }
730}
731
732impl PoolStats {
733    fn new() -> Self {
734        Self {
735            total_allocations: 0,
736            cache_hits: 0,
737            cache_misses: 0,
738            current_memory_usage: 0,
739            peak_memory_usage: 0,
740            cleanup_count: 0,
741        }
742    }
743
744    /// Get cache hit ratio
745    pub fn hit_ratio(&self) -> f64 {
746        if self.total_allocations == 0 {
747            0.0
748        } else {
749            self.cache_hits as f64 / self.total_allocations as f64
750        }
751    }
752
753    /// Get memory efficiency (current/peak ratio)
754    pub fn memory_efficiency(&self) -> f64 {
755        if self.peak_memory_usage == 0 {
756            1.0
757        } else {
758            self.current_memory_usage as f64 / self.peak_memory_usage as f64
759        }
760    }
761}
762
763impl Default for BufferPool {
764    fn default() -> Self {
765        Self::new()
766    }
767}
768
769/// Global buffer pool instance for convenient access
770static GLOBAL_BUFFER_POOL: std::sync::OnceLock<BufferPool> = std::sync::OnceLock::new();
771
772/// Get global buffer pool instance
773pub fn global_buffer_pool() -> &'static BufferPool {
774    GLOBAL_BUFFER_POOL.get_or_init(BufferPool::new)
775}
776
777/// Initialize global buffer pool with custom configuration
778pub fn initialize_global_buffer_pool(config: PoolConfig) -> DomainResult<()> {
779    GLOBAL_BUFFER_POOL
780        .set(BufferPool::with_config(config))
781        .map_err(|_| {
782            DomainError::InternalError("Global buffer pool already initialized".to_string())
783        })?;
784    Ok(())
785}
786
787#[cfg(test)]
788mod tests {
789    use super::*;
790
791    #[test]
792    fn test_buffer_pool_creation() {
793        let pool = BufferPool::new();
794        assert!(pool.stats().is_ok());
795    }
796
797    #[test]
798    fn test_buffer_allocation() {
799        let pool = BufferPool::new();
800        let buffer = pool.get_buffer(BufferSize::Medium);
801        assert!(buffer.is_ok());
802
803        let buffer = buffer.unwrap();
804        assert!(buffer.capacity() >= BufferSize::Medium as usize);
805    }
806
807    #[test]
808    fn test_buffer_reuse() {
809        let pool = BufferPool::new();
810
811        // Allocate and drop buffer
812        {
813            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
814        }
815
816        // Allocate another buffer of same size
817        let _buffer2 = pool.get_buffer(BufferSize::Small).unwrap();
818
819        // Should have cache hit
820        let stats = pool.stats().unwrap();
821        assert!(stats.cache_hits > 0);
822    }
823
824    #[test]
825    fn test_buffer_size_selection() {
826        assert_eq!(BufferSize::for_capacity(500), BufferSize::Small);
827        assert_eq!(BufferSize::for_capacity(2000), BufferSize::Medium);
828        assert_eq!(BufferSize::for_capacity(50000), BufferSize::Large);
829        assert_eq!(BufferSize::for_capacity(100000), BufferSize::XLarge);
830    }
831
832    #[test]
833    fn test_aligned_buffer_creation_guaranteed() {
834        // Test all common SIMD alignments
835        let test_cases = vec![
836            (1024, 16, "SSE alignment"),
837            (2048, 32, "AVX2 alignment"),
838            (4096, 64, "AVX-512 alignment"),
839        ];
840
841        for (capacity, alignment, description) in test_cases {
842            let buffer = AlignedBuffer::new(capacity, alignment).unwrap();
843
844            // Verify pointer alignment
845            let ptr_addr = buffer.as_ptr() as usize;
846            assert_eq!(
847                ptr_addr % alignment,
848                0,
849                "{}: pointer 0x{:x} is not {}-byte aligned",
850                description,
851                ptr_addr,
852                alignment
853            );
854
855            // Verify is_aligned method
856            assert!(
857                buffer.is_aligned(),
858                "{}: is_aligned() returned false for properly aligned buffer",
859                description
860            );
861
862            // Verify capacity
863            assert!(
864                buffer.capacity() >= capacity,
865                "{}: capacity {} is less than requested {}",
866                description,
867                buffer.capacity(),
868                capacity
869            );
870
871            // Verify actual alignment
872            assert!(
873                buffer.actual_alignment() >= alignment,
874                "{}: actual alignment {} is less than requested {}",
875                description,
876                buffer.actual_alignment(),
877                alignment
878            );
879        }
880    }
881
882    #[test]
883    fn test_buffer_operations() {
884        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
885
886        // Test initial state
887        assert_eq!(buffer.len(), 0);
888        assert!(buffer.is_empty());
889        assert_eq!(buffer.capacity(), 1024);
890
891        // Test extend_from_slice
892        let data = b"Hello, SIMD World!";
893        buffer.extend_from_slice(data).unwrap();
894        assert_eq!(buffer.len(), data.len());
895        assert_eq!(buffer.as_slice(), data);
896
897        // Test clear
898        buffer.clear();
899        assert_eq!(buffer.len(), 0);
900        assert!(buffer.is_empty());
901        assert_eq!(buffer.capacity(), 1024); // Capacity should remain
902
903        // Test unsafe set_len
904        unsafe {
905            // Write some data directly
906            let slice = buffer.as_mut_capacity_slice();
907            slice[0..5].copy_from_slice(b"SIMD!");
908            buffer.set_len(5);
909        }
910        assert_eq!(buffer.len(), 5);
911        assert_eq!(&buffer.as_slice()[0..5], b"SIMD!");
912    }
913
914    #[test]
915    fn test_buffer_reserve() {
916        let mut buffer = AlignedBuffer::new(64, 32).unwrap();
917        let _initial_alignment = buffer.actual_alignment();
918
919        // Set some length first
920        unsafe {
921            buffer.set_len(32);
922        }
923
924        // Reserve additional space - should need capacity for len + additional
925        buffer.reserve(256).unwrap();
926        assert!(
927            buffer.capacity() >= 32 + 256,
928            "Expected capacity >= {}, got {}",
929            32 + 256,
930            buffer.capacity()
931        );
932
933        // Alignment should be preserved after reallocation
934        assert!(
935            buffer.actual_alignment() >= 32,
936            "Alignment not preserved after reserve"
937        );
938        assert!(buffer.is_aligned());
939
940        // Test that data is preserved during reallocation
941        buffer.extend_from_slice(b"test data").unwrap();
942        let old_data = buffer.as_slice().to_vec();
943
944        buffer.reserve(1024).unwrap();
945        assert_eq!(buffer.as_slice(), &old_data[..]);
946    }
947
948    #[test]
949    fn test_buffer_clone() {
950        let mut original = AlignedBuffer::new(512, 64).unwrap();
951        original.extend_from_slice(b"Original data").unwrap();
952
953        let cloned = original.clone();
954
955        // Verify clone has same properties
956        assert_eq!(cloned.len(), original.len());
957        assert_eq!(cloned.capacity(), original.capacity());
958        assert_eq!(cloned.alignment, original.alignment);
959        assert_eq!(cloned.as_slice(), original.as_slice());
960
961        // Verify clone has different memory location
962        assert_ne!(cloned.as_ptr(), original.as_ptr());
963
964        // Verify clone is also properly aligned
965        assert!(cloned.is_aligned());
966        assert!(cloned.actual_alignment() >= 64);
967    }
968
969    #[test]
970    fn test_alignment_validation() {
971        // Test valid power-of-2 alignments
972        let valid_alignments = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096];
973
974        for &alignment in &valid_alignments {
975            let result = AlignedBuffer::new(1024, alignment);
976            assert!(result.is_ok(), "Alignment {} should be valid", alignment);
977
978            let buffer = result.unwrap();
979            assert!(
980                buffer.is_aligned(),
981                "Buffer with alignment {} should be aligned",
982                alignment
983            );
984        }
985
986        // Test invalid non-power-of-2 alignments
987        let invalid_alignments = [3, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 17, 31, 33, 63, 65];
988
989        for &alignment in &invalid_alignments {
990            let result = AlignedBuffer::new(1024, alignment);
991            assert!(result.is_err(), "Alignment {} should be invalid", alignment);
992        }
993
994        // Test too large alignment
995        assert!(AlignedBuffer::new(1024, 8192).is_err());
996    }
997
998    #[test]
999    fn test_actual_alignment_calculation() {
1000        // Create buffers with different alignments and verify actual_alignment()
1001        for &requested_align in &[16, 32, 64] {
1002            let buffer = AlignedBuffer::new(1024, requested_align).unwrap();
1003            let actual = buffer.actual_alignment();
1004
1005            assert!(
1006                actual >= requested_align,
1007                "Actual alignment {} is less than requested {}",
1008                actual,
1009                requested_align
1010            );
1011
1012            // actual_alignment should be a power of 2
1013            assert!(
1014                actual.is_power_of_two(),
1015                "Actual alignment {} is not a power of 2",
1016                actual
1017            );
1018        }
1019    }
1020
1021    #[test]
1022    fn test_simd_compatibility_check() {
1023        // SSE buffer should be compatible with SSE but might not be with AVX-512
1024        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1025        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1026        assert!(sse_buffer.is_simd_compatible(SimdType::Neon)); // Same alignment as SSE
1027
1028        // AVX-512 buffer should be compatible with all instruction sets
1029        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1030        assert!(avx512_buffer.is_simd_compatible(SimdType::Sse));
1031        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx2));
1032        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1033        assert!(avx512_buffer.is_simd_compatible(SimdType::Neon));
1034    }
1035
1036    #[test]
1037    fn test_zero_copy_verification() {
1038        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
1039
1040        // Get raw pointer before modification
1041        let ptr_before = buffer.as_ptr();
1042
1043        // Perform various operations that should NOT move the buffer
1044        buffer.clear();
1045        buffer.extend_from_slice(b"test").unwrap();
1046        unsafe {
1047            buffer.set_len(2);
1048        }
1049
1050        // Pointer should remain the same (zero-copy)
1051        assert_eq!(
1052            ptr_before,
1053            buffer.as_ptr(),
1054            "Buffer was moved during operations (not zero-copy)"
1055        );
1056
1057        // Only reserve should potentially change the pointer
1058        buffer.reserve(2048).unwrap();
1059        // After reserve, pointer might change but should still be aligned
1060        assert!(buffer.is_aligned());
1061    }
1062
1063    #[test]
1064    fn test_pool_cleanup() {
1065        let config = PoolConfig {
1066            buffer_ttl: Duration::from_millis(1),
1067            ..Default::default()
1068        };
1069        let pool = BufferPool::with_config(config);
1070
1071        // Allocate and drop buffer
1072        {
1073            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1074        }
1075
1076        // Wait for TTL
1077        std::thread::sleep(Duration::from_millis(10));
1078
1079        // Cleanup should free the buffer
1080        let cleanup_stats = pool.cleanup().unwrap();
1081        assert!(cleanup_stats.freed_buffers > 0);
1082    }
1083
1084    #[test]
1085    fn test_global_buffer_pool() {
1086        let pool = global_buffer_pool();
1087        let buffer = pool.get_buffer(BufferSize::Medium);
1088        assert!(buffer.is_ok());
1089    }
1090
1091    #[test]
1092    fn test_memory_limit_enforcement() {
1093        let config = PoolConfig {
1094            max_total_memory: 1024, // Very small limit
1095            max_buffers_per_bucket: 10,
1096            ..Default::default()
1097        };
1098        let pool = BufferPool::with_config(config);
1099
1100        // Create a buffer that exceeds the memory limit
1101        let result = pool.get_buffer(BufferSize::Medium); // 8KB > 1KB limit
1102
1103        assert!(result.is_err());
1104
1105        if let Err(e) = result {
1106            assert!(e.to_string().contains("memory limit"));
1107        }
1108    }
1109
1110    #[test]
1111    fn test_per_bucket_limit_enforcement() {
1112        let config = PoolConfig {
1113            max_buffers_per_bucket: 2,          // Very small limit
1114            max_total_memory: 10 * 1024 * 1024, // Generous memory limit
1115            ..Default::default()
1116        };
1117        let pool = BufferPool::with_config(config);
1118
1119        // Allocate and drop buffers to fill the bucket
1120        for _ in 0..3 {
1121            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1122            // Buffer goes back to pool on drop
1123        }
1124
1125        // Only 2 buffers should be retained in the pool
1126        let stats = pool.stats().unwrap();
1127        assert!(stats.cache_hits <= 2, "Too many buffers retained in bucket");
1128    }
1129
1130    #[test]
1131    fn test_buffer_size_validation() {
1132        let pool = BufferPool::new();
1133
1134        // All standard buffer sizes should be valid
1135        for size in BufferSize::all_sizes() {
1136            let result = pool.get_buffer(*size);
1137            assert!(result.is_ok(), "Buffer size {:?} should be valid", size);
1138        }
1139    }
1140
1141    #[test]
1142    fn test_memory_safety() {
1143        // Test that dropping a buffer properly deallocates memory
1144        // This test would fail under valgrind/ASAN if there's a memory leak
1145        for _ in 0..100 {
1146            let buffer = AlignedBuffer::new(1024, 64).unwrap();
1147            drop(buffer);
1148        }
1149
1150        // Test clone and drop
1151        for _ in 0..100 {
1152            let buffer = AlignedBuffer::new(512, 32).unwrap();
1153            let cloned = buffer.clone();
1154            drop(buffer);
1155            drop(cloned);
1156        }
1157    }
1158
1159    #[test]
1160    fn test_simd_specific_constructors() {
1161        // Test SSE alignment (16 bytes)
1162        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1163        assert!(sse_buffer.is_aligned());
1164        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1165        assert_eq!(sse_buffer.alignment, 16);
1166
1167        // Test AVX2 alignment (32 bytes)
1168        let avx2_buffer = AlignedBuffer::new_avx2(1024).unwrap();
1169        assert!(avx2_buffer.is_aligned());
1170        assert!(avx2_buffer.is_simd_compatible(SimdType::Avx2));
1171        assert_eq!(avx2_buffer.alignment, 32);
1172
1173        // Test AVX-512 alignment (64 bytes)
1174        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1175        assert!(avx512_buffer.is_aligned());
1176        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1177        assert_eq!(avx512_buffer.alignment, 64);
1178    }
1179
1180    #[test]
1181    fn test_simd_alignment_compatibility() {
1182        let buffer_64 = AlignedBuffer::new(1024, 64).unwrap();
1183
1184        // 64-byte aligned buffer should be compatible with all SIMD types
1185        assert!(buffer_64.is_simd_compatible(SimdType::Sse)); // 16-byte requirement
1186        assert!(buffer_64.is_simd_compatible(SimdType::Avx2)); // 32-byte requirement
1187        assert!(buffer_64.is_simd_compatible(SimdType::Avx512)); // 64-byte requirement
1188        assert!(buffer_64.is_simd_compatible(SimdType::Neon)); // 16-byte requirement
1189
1190        // Note: We can't easily test incompatible alignments since the allocator
1191        // might provide better alignment than requested for performance reasons.
1192        // Instead, test the requested alignment vs required alignment directly.
1193        #[allow(clippy::assertions_on_constants)]
1194        {
1195            assert!(64 >= 16); // SSE compatible
1196            assert!(64 >= 32); // AVX2 compatible
1197            assert!(64 >= 64); // AVX512 compatible
1198            assert!(64 >= 16); // NEON compatible
1199        }
1200
1201        let buffer_16 = AlignedBuffer::new(1024, 16).unwrap();
1202
1203        // Test that buffer reports correct requested alignment
1204        assert_eq!(buffer_16.alignment, 16);
1205
1206        // 16-byte aligned buffer should be compatible with SSE and NEON
1207        assert!(buffer_16.is_simd_compatible(SimdType::Sse));
1208        assert!(buffer_16.is_simd_compatible(SimdType::Neon));
1209
1210        // Note: actual_alignment() might be higher than 16 due to allocator behavior
1211        // so we can't reliably test incompatibility. Instead verify logic:
1212        #[allow(clippy::assertions_on_constants)]
1213        {
1214            assert!(16 >= 16); // SSE requirement met
1215            assert!(16 < 32); // AVX2 requirement NOT met by requested alignment
1216            assert!(16 < 64); // AVX512 requirement NOT met by requested alignment
1217        }
1218    }
1219
1220    #[test]
1221    fn test_actual_alignment_detection() {
1222        let buffer = AlignedBuffer::new(1024, 64).unwrap();
1223
1224        let actual_alignment = buffer.actual_alignment();
1225        assert!(
1226            actual_alignment >= 64,
1227            "Buffer has actual alignment of {}, expected at least 64",
1228            actual_alignment
1229        );
1230
1231        // The actual alignment should be a power of 2 and >= requested alignment
1232        assert!(actual_alignment.is_power_of_two());
1233        assert!(actual_alignment >= buffer.alignment);
1234    }
1235
1236    #[test]
1237    fn test_simd_pool_configuration() {
1238        // Test pool with high SIMD alignment requirement
1239        let config = PoolConfig {
1240            simd_alignment: 64, // AVX-512 alignment
1241            ..Default::default()
1242        };
1243        let pool = BufferPool::with_config(config);
1244
1245        let buffer = pool.get_buffer(BufferSize::Medium).unwrap();
1246        assert!(buffer.buffer().unwrap().is_aligned());
1247        assert!(
1248            buffer
1249                .buffer()
1250                .unwrap()
1251                .is_simd_compatible(SimdType::Avx512)
1252        );
1253    }
1254
1255    #[test]
1256    fn test_alignment_edge_cases() {
1257        // Test minimum alignment
1258        let buffer_min = AlignedBuffer::new(64, 1).unwrap();
1259        assert!(buffer_min.is_aligned());
1260        assert!(buffer_min.alignment >= mem::align_of::<usize>());
1261
1262        // Test power-of-2 validation
1263        assert!(AlignedBuffer::new(1024, 3).is_err());
1264        assert!(AlignedBuffer::new(1024, 17).is_err());
1265        assert!(AlignedBuffer::new(1024, 33).is_err());
1266
1267        // Test maximum alignment limit
1268        assert!(AlignedBuffer::new(1024, 8192).is_err());
1269    }
1270
1271    #[test]
1272    fn test_simd_performance_oriented_allocation() {
1273        // Test that allocation pattern is suitable for high-performance SIMD
1274        let buffer = AlignedBuffer::new_avx512(4096).unwrap();
1275
1276        // Verify the buffer can be used for actual SIMD-like operations
1277        let slice = unsafe { std::slice::from_raw_parts_mut(buffer.ptr.as_ptr(), buffer.capacity) };
1278
1279        // Fill with test pattern
1280        for (i, byte) in slice.iter_mut().enumerate() {
1281            *byte = (i % 256) as u8;
1282        }
1283
1284        // Verify alignment is maintained through operations
1285        assert!(buffer.is_aligned());
1286        assert_eq!(slice[0], 0);
1287        assert_eq!(slice[255], 255);
1288        assert_eq!(slice[256], 0);
1289    }
1290}