pjson_rs/parser/
buffer_pool.rs

1//! Buffer pool system for zero-copy parsing with memory management
2//!
3//! This module provides a memory pool system to minimize allocations during
4//! JSON parsing, with support for different buffer sizes and reuse strategies.
5
6use crate::{
7    config::SecurityConfig,
8    domain::{DomainError, DomainResult},
9    parser::allocator::global_allocator,
10    security::SecurityValidator,
11};
12use dashmap::DashMap;
13use std::{
14    alloc::Layout,
15    mem,
16    ptr::{self, NonNull},
17    slice,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22/// Buffer pool that manages reusable byte buffers for parsing
23#[derive(Debug)]
24pub struct BufferPool {
25    pools: Arc<DashMap<BufferSize, BufferBucket>>,
26    config: PoolConfig,
27    stats: Arc<parking_lot::Mutex<PoolStats>>, // Keep stats under mutex as it's written less frequently
28}
29
30/// Configuration for buffer pool behavior
31#[derive(Debug, Clone)]
32pub struct PoolConfig {
33    /// Maximum number of buffers per size bucket
34    pub max_buffers_per_bucket: usize,
35    /// Maximum total memory usage in bytes
36    pub max_total_memory: usize,
37    /// How long to keep unused buffers before cleanup
38    pub buffer_ttl: Duration,
39    /// Enable/disable pool statistics tracking
40    pub track_stats: bool,
41    /// Alignment for SIMD operations (typically 32 or 64 bytes)
42    pub simd_alignment: usize,
43    /// Security validator for buffer validation
44    pub validator: SecurityValidator,
45}
46
47/// Standard buffer sizes for different parsing scenarios
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
49pub enum BufferSize {
50    /// Small buffers for short JSON strings (1KB)
51    Small = 1024,
52    /// Medium buffers for typical API responses (8KB)  
53    Medium = 8192,
54    /// Large buffers for complex documents (64KB)
55    Large = 65536,
56    /// Extra large buffers for bulk data (512KB)
57    XLarge = 524288,
58    /// Huge buffers for massive documents (4MB)
59    Huge = 4194304,
60}
61
62/// A bucket containing buffers of the same size
63#[derive(Debug)]
64struct BufferBucket {
65    buffers: Vec<AlignedBuffer>,
66    size: BufferSize,
67    last_access: Instant,
68}
69
70/// SIMD-aligned buffer with metadata
71///
72/// This buffer guarantees proper alignment for SIMD operations using direct memory allocation.
73/// It supports SSE (16-byte), AVX2 (32-byte), and AVX-512 (64-byte) alignments.
74pub struct AlignedBuffer {
75    /// Raw pointer to aligned memory
76    ptr: NonNull<u8>,
77    /// Current length of valid data
78    len: usize,
79    /// Total capacity in bytes
80    capacity: usize,
81    /// Memory alignment requirement
82    alignment: usize,
83    /// Layout used for allocation (needed for deallocation)
84    layout: Layout,
85    /// Creation timestamp
86    created_at: Instant,
87    /// Last usage timestamp
88    last_used: Instant,
89}
90
91// Safety: AlignedBuffer can be safely sent between threads
92unsafe impl Send for AlignedBuffer {}
93
94// Safety: AlignedBuffer can be safely shared between threads (no interior mutability)
95unsafe impl Sync for AlignedBuffer {}
96
97impl std::fmt::Debug for AlignedBuffer {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("AlignedBuffer")
100            .field("ptr", &format_args!("0x{:x}", self.ptr.as_ptr() as usize))
101            .field("len", &self.len)
102            .field("capacity", &self.capacity)
103            .field("alignment", &self.alignment)
104            .field("is_aligned", &self.is_aligned())
105            .field("created_at", &self.created_at)
106            .field("last_used", &self.last_used)
107            .finish()
108    }
109}
110
111/// Statistics about buffer pool usage
112#[derive(Debug, Clone)]
113pub struct PoolStats {
114    /// Total allocations requested
115    pub total_allocations: u64,
116    /// Cache hits (buffer reused)
117    pub cache_hits: u64,
118    /// Cache misses (new buffer allocated)
119    pub cache_misses: u64,
120    /// Current memory usage in bytes
121    pub current_memory_usage: usize,
122    /// Peak memory usage in bytes
123    pub peak_memory_usage: usize,
124    /// Number of cleanup operations performed
125    pub cleanup_count: u64,
126}
127
128impl BufferPool {
129    /// Create new buffer pool with default configuration
130    pub fn new() -> Self {
131        Self::with_config(PoolConfig::default())
132    }
133
134    /// Create buffer pool with custom configuration
135    pub fn with_config(config: PoolConfig) -> Self {
136        Self {
137            pools: Arc::new(DashMap::new()),
138            config,
139            stats: Arc::new(parking_lot::Mutex::new(PoolStats::new())),
140        }
141    }
142
143    /// Create buffer pool with security configuration
144    pub fn with_security_config(security_config: SecurityConfig) -> Self {
145        Self::with_config(PoolConfig::from(&security_config))
146    }
147
148    /// Get buffer of specified size, reusing if available
149    pub fn get_buffer(&self, size: BufferSize) -> DomainResult<PooledBuffer> {
150        // Security validation: check buffer size
151        self.config
152            .validator
153            .validate_buffer_size(size as usize)
154            .map_err(|e| DomainError::SecurityViolation(e.to_string()))?;
155
156        // Check if we would exceed total memory limit
157        let current_usage = self.current_memory_usage().unwrap_or(0);
158        if current_usage + (size as usize) > self.config.max_total_memory {
159            return Err(DomainError::ResourceExhausted(format!(
160                "Adding buffer of size {} would exceed memory limit: current={}, limit={}",
161                size as usize, current_usage, self.config.max_total_memory
162            )));
163        }
164
165        if self.config.track_stats {
166            self.increment_allocations();
167        }
168
169        // Try to get a buffer from existing bucket
170        if let Some(mut bucket_ref) = self.pools.get_mut(&size)
171            && let Some(mut buffer) = bucket_ref.buffers.pop()
172        {
173            buffer.last_used = Instant::now();
174            bucket_ref.last_access = Instant::now();
175
176            if self.config.track_stats {
177                self.increment_cache_hits();
178            }
179
180            return Ok(PooledBuffer::new(
181                buffer,
182                Arc::clone(&self.pools),
183                size,
184                self.config.max_buffers_per_bucket,
185            ));
186        }
187
188        // No buffer available, create new one
189        if self.config.track_stats {
190            self.increment_cache_misses();
191        }
192
193        let buffer = AlignedBuffer::new(size as usize, self.config.simd_alignment)?;
194        Ok(PooledBuffer::new(
195            buffer,
196            Arc::clone(&self.pools),
197            size,
198            self.config.max_buffers_per_bucket,
199        ))
200    }
201
202    /// Get buffer with at least the specified capacity
203    pub fn get_buffer_with_capacity(&self, min_capacity: usize) -> DomainResult<PooledBuffer> {
204        let size = BufferSize::for_capacity(min_capacity);
205        self.get_buffer(size)
206    }
207
208    /// Perform cleanup of old unused buffers
209    pub fn cleanup(&self) -> DomainResult<CleanupStats> {
210        let now = Instant::now();
211        let mut freed_buffers = 0;
212        let mut freed_memory = 0;
213
214        // DashMap doesn't have retain, so we collect keys to remove
215        let mut keys_to_remove = Vec::new();
216
217        for mut entry in self.pools.iter_mut() {
218            let bucket = entry.value_mut();
219            let old_count = bucket.buffers.len();
220
221            bucket.buffers.retain(|buffer| {
222                let age = now.duration_since(buffer.last_used);
223                if age > self.config.buffer_ttl {
224                    freed_memory += buffer.capacity;
225                    false
226                } else {
227                    true
228                }
229            });
230
231            freed_buffers += old_count - bucket.buffers.len();
232
233            // Mark bucket for removal if empty and not recently accessed
234            if bucket.buffers.is_empty()
235                && now.duration_since(bucket.last_access) >= self.config.buffer_ttl
236            {
237                keys_to_remove.push(*entry.key());
238            }
239        }
240
241        // Remove empty buckets
242        for key in keys_to_remove {
243            self.pools.remove(&key);
244        }
245
246        if self.config.track_stats {
247            self.increment_cleanup_count();
248            self.update_current_memory_usage(-(freed_memory as i64));
249        }
250
251        Ok(CleanupStats {
252            freed_buffers,
253            freed_memory,
254        })
255    }
256
257    /// Get current pool statistics
258    pub fn stats(&self) -> DomainResult<PoolStats> {
259        let stats = self.stats.lock();
260        Ok(stats.clone())
261    }
262
263    /// Get current memory usage across all pools
264    pub fn current_memory_usage(&self) -> DomainResult<usize> {
265        use rayon::prelude::*;
266
267        let usage = self
268            .pools
269            .iter()
270            .par_bridge()
271            .map(|entry| {
272                entry
273                    .value()
274                    .buffers
275                    .par_iter()
276                    .map(|b| b.capacity)
277                    .sum::<usize>()
278            })
279            .sum();
280
281        Ok(usage)
282    }
283
284    // Private statistics methods
285
286    fn increment_allocations(&self) {
287        let mut stats = self.stats.lock();
288        stats.total_allocations += 1;
289    }
290
291    fn increment_cache_hits(&self) {
292        let mut stats = self.stats.lock();
293        stats.cache_hits += 1;
294    }
295
296    fn increment_cache_misses(&self) {
297        let mut stats = self.stats.lock();
298        stats.cache_misses += 1;
299    }
300
301    fn increment_cleanup_count(&self) {
302        let mut stats = self.stats.lock();
303        stats.cleanup_count += 1;
304    }
305
306    fn update_current_memory_usage(&self, delta: i64) {
307        let mut stats = self.stats.lock();
308        stats.current_memory_usage = (stats.current_memory_usage as i64 + delta).max(0) as usize;
309        stats.peak_memory_usage = stats.peak_memory_usage.max(stats.current_memory_usage);
310    }
311}
312
313impl BufferSize {
314    /// Get appropriate buffer size for given capacity
315    pub fn for_capacity(capacity: usize) -> Self {
316        match capacity {
317            0..=1024 => BufferSize::Small,
318            1025..=8192 => BufferSize::Medium,
319            8193..=65536 => BufferSize::Large,
320            65537..=524288 => BufferSize::XLarge,
321            _ => BufferSize::Huge,
322        }
323    }
324
325    /// Get all available buffer sizes in order
326    pub fn all_sizes() -> &'static [BufferSize] {
327        &[
328            BufferSize::Small,
329            BufferSize::Medium,
330            BufferSize::Large,
331            BufferSize::XLarge,
332            BufferSize::Huge,
333        ]
334    }
335}
336
337impl AlignedBuffer {
338    /// Create new aligned buffer with guaranteed SIMD alignment
339    ///
340    /// # Arguments
341    /// * `capacity` - Minimum capacity in bytes
342    /// * `alignment` - Required alignment (must be power of 2)
343    ///
344    /// # Safety
345    /// This function uses unsafe code to allocate aligned memory.
346    /// The memory is properly tracked and will be deallocated on drop.
347    pub fn new(capacity: usize, alignment: usize) -> DomainResult<Self> {
348        // Validate alignment is power of 2 and reasonable
349        if !alignment.is_power_of_two() {
350            return Err(DomainError::InvalidInput(format!(
351                "Alignment {} is not a power of 2",
352                alignment
353            )));
354        }
355
356        // Validate alignment is not too large (max 4096 bytes for page alignment)
357        if alignment > 4096 {
358            return Err(DomainError::InvalidInput(format!(
359                "Alignment {} exceeds maximum of 4096",
360                alignment
361            )));
362        }
363
364        // Minimum alignment should be at least size of usize for proper alignment
365        let alignment = alignment.max(mem::align_of::<usize>());
366
367        // Align capacity to SIMD boundaries
368        let aligned_capacity = (capacity + alignment - 1) & !(alignment - 1);
369
370        // Ensure minimum capacity for safety
371        let aligned_capacity = aligned_capacity.max(alignment);
372
373        // Create layout for allocation (kept for Drop implementation)
374        let layout = Layout::from_size_align(aligned_capacity, alignment).map_err(|e| {
375            DomainError::InvalidInput(format!(
376                "Invalid layout: capacity={}, alignment={}, error={}",
377                aligned_capacity, alignment, e
378            ))
379        })?;
380
381        // Use global SIMD allocator for better performance
382        let allocator = global_allocator();
383
384        // Allocate aligned memory using the appropriate allocator backend
385        // Safety: alignment has been validated above
386        let ptr = unsafe { allocator.alloc_aligned(aligned_capacity, alignment)? };
387
388        let now = Instant::now();
389        Ok(Self {
390            ptr,
391            len: 0,
392            capacity: aligned_capacity,
393            alignment,
394            layout,
395            created_at: now,
396            last_used: now,
397        })
398    }
399
400    /// Create an aligned buffer with specific SIMD level
401    pub fn new_sse(capacity: usize) -> DomainResult<Self> {
402        Self::new(capacity, 16) // SSE requires 16-byte alignment
403    }
404
405    /// Create an aligned buffer for AVX2 operations
406    pub fn new_avx2(capacity: usize) -> DomainResult<Self> {
407        Self::new(capacity, 32) // AVX2 requires 32-byte alignment
408    }
409
410    /// Create an aligned buffer for AVX-512 operations
411    pub fn new_avx512(capacity: usize) -> DomainResult<Self> {
412        Self::new(capacity, 64) // AVX-512 requires 64-byte alignment
413    }
414
415    /// Get mutable slice to buffer data
416    pub fn as_mut_slice(&mut self) -> &mut [u8] {
417        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
418    }
419
420    /// Get immutable slice to buffer data
421    pub fn as_slice(&self) -> &[u8] {
422        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
423    }
424
425    /// Get a mutable slice with full capacity
426    pub fn as_mut_capacity_slice(&mut self) -> &mut [u8] {
427        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.capacity) }
428    }
429
430    /// Set the length of valid data
431    ///
432    /// # Safety
433    /// Caller must ensure that `new_len` bytes are initialized
434    pub unsafe fn set_len(&mut self, new_len: usize) {
435        debug_assert!(
436            new_len <= self.capacity,
437            "new_len {} exceeds capacity {}",
438            new_len,
439            self.capacity
440        );
441        self.len = new_len;
442        self.last_used = Instant::now();
443    }
444
445    /// Reserve additional capacity
446    pub fn reserve(&mut self, additional: usize) -> DomainResult<()> {
447        let new_capacity = self
448            .len
449            .checked_add(additional)
450            .ok_or_else(|| DomainError::InvalidInput("Capacity overflow".to_string()))?;
451
452        if new_capacity <= self.capacity {
453            return Ok(());
454        }
455
456        // Align new capacity
457        let aligned_capacity = (new_capacity + self.alignment - 1) & !(self.alignment - 1);
458
459        // Use global SIMD allocator for reallocation
460        let allocator = global_allocator();
461
462        // Reallocate using the allocator (which will handle data copying)
463        let new_ptr =
464            unsafe { allocator.realloc_aligned(self.ptr, self.layout, aligned_capacity)? };
465
466        // Update layout for the new size
467        let new_layout = Layout::from_size_align(aligned_capacity, self.alignment)
468            .map_err(|e| DomainError::InvalidInput(format!("Invalid layout: {}", e)))?;
469
470        self.ptr = new_ptr;
471        self.capacity = aligned_capacity;
472        self.layout = new_layout;
473        self.last_used = Instant::now();
474
475        Ok(())
476    }
477
478    /// Push bytes to the buffer
479    pub fn extend_from_slice(&mut self, data: &[u8]) -> DomainResult<()> {
480        let required_capacity = self
481            .len
482            .checked_add(data.len())
483            .ok_or_else(|| DomainError::InvalidInput("Length overflow".to_string()))?;
484
485        if required_capacity > self.capacity {
486            self.reserve(data.len())?;
487        }
488
489        unsafe {
490            ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.as_ptr().add(self.len), data.len());
491            self.len += data.len();
492        }
493
494        self.last_used = Instant::now();
495        Ok(())
496    }
497
498    /// Clear buffer contents but keep allocated memory
499    pub fn clear(&mut self) {
500        self.len = 0;
501        self.last_used = Instant::now();
502    }
503
504    /// Get buffer capacity
505    pub fn capacity(&self) -> usize {
506        self.capacity
507    }
508
509    /// Get current length of valid data
510    pub fn len(&self) -> usize {
511        self.len
512    }
513
514    /// Check if buffer is empty
515    pub fn is_empty(&self) -> bool {
516        self.len == 0
517    }
518
519    /// Get the raw pointer to the buffer
520    pub fn as_ptr(&self) -> *const u8 {
521        self.ptr.as_ptr()
522    }
523
524    /// Get the mutable raw pointer to the buffer  
525    pub fn as_mut_ptr(&mut self) -> *mut u8 {
526        self.ptr.as_ptr()
527    }
528
529    /// Check if buffer is properly aligned
530    ///
531    /// This validates that the buffer pointer has the requested alignment,
532    /// which is critical for SIMD operations.
533    pub fn is_aligned(&self) -> bool {
534        let ptr_addr = self.ptr.as_ptr() as usize;
535        ptr_addr.is_multiple_of(self.alignment)
536    }
537
538    /// Get the actual alignment of the buffer
539    pub fn actual_alignment(&self) -> usize {
540        let ptr_addr = self.ptr.as_ptr() as usize;
541        // Find the highest power of 2 that divides the address
542        if ptr_addr == 0 {
543            return usize::MAX; // null pointer is infinitely aligned
544        }
545
546        // Use trailing zeros to find alignment
547        1 << ptr_addr.trailing_zeros()
548    }
549
550    /// Verify buffer is suitable for specific SIMD instruction set
551    pub fn is_simd_compatible(&self, simd_type: SimdType) -> bool {
552        let required_alignment = match simd_type {
553            SimdType::Sse => 16,
554            SimdType::Avx2 => 32,
555            SimdType::Avx512 => 64,
556            SimdType::Neon => 16,
557        };
558
559        self.actual_alignment() >= required_alignment
560    }
561}
562
563/// SIMD instruction set types
564#[derive(Debug, Clone, Copy, PartialEq, Eq)]
565pub enum SimdType {
566    /// SSE instructions (16-byte alignment)
567    Sse,
568    /// AVX2 instructions (32-byte alignment)  
569    Avx2,
570    /// AVX-512 instructions (64-byte alignment)
571    Avx512,
572    /// ARM NEON instructions (16-byte alignment)
573    Neon,
574}
575
576impl Drop for AlignedBuffer {
577    fn drop(&mut self) {
578        // Use the global SIMD allocator for deallocation
579        let allocator = global_allocator();
580
581        // Safety: We allocated this memory with the same layout
582        unsafe {
583            allocator.dealloc_aligned(self.ptr, self.layout);
584        }
585    }
586}
587
588impl Clone for AlignedBuffer {
589    fn clone(&self) -> Self {
590        // Create new buffer with same alignment and capacity
591        let mut new_buffer =
592            Self::new(self.capacity, self.alignment).expect("Failed to clone buffer");
593
594        // Copy data
595        unsafe {
596            ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_buffer.ptr.as_ptr(), self.len);
597            new_buffer.len = self.len;
598        }
599
600        new_buffer
601    }
602}
603
604/// RAII wrapper for pooled buffer that returns buffer to pool on drop
605pub struct PooledBuffer {
606    buffer: Option<AlignedBuffer>,
607    pool: Arc<DashMap<BufferSize, BufferBucket>>,
608    size: BufferSize,
609    max_buffers_per_bucket: usize,
610}
611
612impl PooledBuffer {
613    fn new(
614        buffer: AlignedBuffer,
615        pool: Arc<DashMap<BufferSize, BufferBucket>>,
616        size: BufferSize,
617        max_buffers_per_bucket: usize,
618    ) -> Self {
619        Self {
620            buffer: Some(buffer),
621            pool,
622            size,
623            max_buffers_per_bucket,
624        }
625    }
626
627    /// Get mutable reference to buffer
628    pub fn buffer_mut(&mut self) -> Option<&mut AlignedBuffer> {
629        self.buffer.as_mut()
630    }
631
632    /// Get immutable reference to buffer
633    pub fn buffer(&self) -> Option<&AlignedBuffer> {
634        self.buffer.as_ref()
635    }
636
637    /// Get buffer capacity
638    pub fn capacity(&self) -> usize {
639        self.buffer.as_ref().map(|b| b.capacity()).unwrap_or(0)
640    }
641
642    /// Clear buffer contents
643    pub fn clear(&mut self) {
644        if let Some(buffer) = &mut self.buffer {
645            buffer.clear();
646        }
647    }
648}
649
650impl Drop for PooledBuffer {
651    fn drop(&mut self) {
652        if let Some(mut buffer) = self.buffer.take() {
653            buffer.clear(); // Clear contents before returning to pool
654
655            // Get or create bucket for this buffer size
656            let mut bucket_ref = self.pool.entry(self.size).or_insert_with(|| BufferBucket {
657                buffers: Vec::new(),
658                size: self.size,
659                last_access: Instant::now(),
660            });
661
662            // Only return to pool if we haven't exceeded the per-bucket limit
663            if bucket_ref.buffers.len() < self.max_buffers_per_bucket {
664                bucket_ref.buffers.push(buffer);
665                bucket_ref.last_access = Instant::now();
666            }
667        }
668    }
669}
670
671/// Result of cleanup operation
672#[derive(Debug, Clone)]
673pub struct CleanupStats {
674    pub freed_buffers: usize,
675    pub freed_memory: usize,
676}
677
678impl PoolConfig {
679    /// Create configuration from security config
680    pub fn from_security_config(security_config: &SecurityConfig) -> Self {
681        Self::from(security_config)
682    }
683
684    /// Create configuration optimized for SIMD operations
685    pub fn simd_optimized() -> Self {
686        let mut config = Self::from(&SecurityConfig::high_throughput());
687        config.simd_alignment = 64; // AVX-512 alignment
688        config
689    }
690
691    /// Create configuration for low-memory environments
692    pub fn low_memory() -> Self {
693        let mut config = Self::from(&SecurityConfig::low_memory());
694        config.track_stats = false; // Reduce overhead
695        config
696    }
697
698    /// Create configuration for development/testing
699    pub fn development() -> Self {
700        Self::from(&SecurityConfig::development())
701    }
702}
703
704impl Default for PoolConfig {
705    fn default() -> Self {
706        let security_config = SecurityConfig::default();
707        Self {
708            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
709            max_total_memory: security_config.buffers.max_total_memory,
710            buffer_ttl: security_config.buffer_ttl(),
711            track_stats: true,
712            simd_alignment: 32, // AVX2 alignment
713            validator: SecurityValidator::new(security_config),
714        }
715    }
716}
717
718impl From<&SecurityConfig> for PoolConfig {
719    fn from(security_config: &SecurityConfig) -> Self {
720        Self {
721            max_buffers_per_bucket: security_config.buffers.max_buffers_per_bucket,
722            max_total_memory: security_config.buffers.max_total_memory,
723            buffer_ttl: security_config.buffer_ttl(),
724            track_stats: true,
725            simd_alignment: 32, // AVX2 alignment
726            validator: SecurityValidator::new(security_config.clone()),
727        }
728    }
729}
730
731impl PoolStats {
732    fn new() -> Self {
733        Self {
734            total_allocations: 0,
735            cache_hits: 0,
736            cache_misses: 0,
737            current_memory_usage: 0,
738            peak_memory_usage: 0,
739            cleanup_count: 0,
740        }
741    }
742
743    /// Get cache hit ratio
744    pub fn hit_ratio(&self) -> f64 {
745        if self.total_allocations == 0 {
746            0.0
747        } else {
748            self.cache_hits as f64 / self.total_allocations as f64
749        }
750    }
751
752    /// Get memory efficiency (current/peak ratio)
753    pub fn memory_efficiency(&self) -> f64 {
754        if self.peak_memory_usage == 0 {
755            1.0
756        } else {
757            self.current_memory_usage as f64 / self.peak_memory_usage as f64
758        }
759    }
760}
761
762impl Default for BufferPool {
763    fn default() -> Self {
764        Self::new()
765    }
766}
767
768/// Global buffer pool instance for convenient access
769static GLOBAL_BUFFER_POOL: std::sync::OnceLock<BufferPool> = std::sync::OnceLock::new();
770
771/// Get global buffer pool instance
772pub fn global_buffer_pool() -> &'static BufferPool {
773    GLOBAL_BUFFER_POOL.get_or_init(BufferPool::new)
774}
775
776/// Initialize global buffer pool with custom configuration
777pub fn initialize_global_buffer_pool(config: PoolConfig) -> DomainResult<()> {
778    GLOBAL_BUFFER_POOL
779        .set(BufferPool::with_config(config))
780        .map_err(|_| {
781            DomainError::InternalError("Global buffer pool already initialized".to_string())
782        })?;
783    Ok(())
784}
785
786#[cfg(test)]
787mod tests {
788    use super::*;
789
790    #[test]
791    fn test_buffer_pool_creation() {
792        let pool = BufferPool::new();
793        assert!(pool.stats().is_ok());
794    }
795
796    #[test]
797    fn test_buffer_allocation() {
798        let pool = BufferPool::new();
799        let buffer = pool.get_buffer(BufferSize::Medium);
800        assert!(buffer.is_ok());
801
802        let buffer = buffer.unwrap();
803        assert!(buffer.capacity() >= BufferSize::Medium as usize);
804    }
805
806    #[test]
807    fn test_buffer_reuse() {
808        let pool = BufferPool::new();
809
810        // Allocate and drop buffer
811        {
812            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
813        }
814
815        // Allocate another buffer of same size
816        let _buffer2 = pool.get_buffer(BufferSize::Small).unwrap();
817
818        // Should have cache hit
819        let stats = pool.stats().unwrap();
820        assert!(stats.cache_hits > 0);
821    }
822
823    #[test]
824    fn test_buffer_size_selection() {
825        assert_eq!(BufferSize::for_capacity(500), BufferSize::Small);
826        assert_eq!(BufferSize::for_capacity(2000), BufferSize::Medium);
827        assert_eq!(BufferSize::for_capacity(50000), BufferSize::Large);
828        assert_eq!(BufferSize::for_capacity(100000), BufferSize::XLarge);
829    }
830
831    #[test]
832    fn test_aligned_buffer_creation_guaranteed() {
833        // Test all common SIMD alignments
834        let test_cases = vec![
835            (1024, 16, "SSE alignment"),
836            (2048, 32, "AVX2 alignment"),
837            (4096, 64, "AVX-512 alignment"),
838        ];
839
840        for (capacity, alignment, description) in test_cases {
841            let buffer = AlignedBuffer::new(capacity, alignment).unwrap();
842
843            // Verify pointer alignment
844            let ptr_addr = buffer.as_ptr() as usize;
845            assert_eq!(
846                ptr_addr % alignment,
847                0,
848                "{}: pointer 0x{:x} is not {}-byte aligned",
849                description,
850                ptr_addr,
851                alignment
852            );
853
854            // Verify is_aligned method
855            assert!(
856                buffer.is_aligned(),
857                "{}: is_aligned() returned false for properly aligned buffer",
858                description
859            );
860
861            // Verify capacity
862            assert!(
863                buffer.capacity() >= capacity,
864                "{}: capacity {} is less than requested {}",
865                description,
866                buffer.capacity(),
867                capacity
868            );
869
870            // Verify actual alignment
871            assert!(
872                buffer.actual_alignment() >= alignment,
873                "{}: actual alignment {} is less than requested {}",
874                description,
875                buffer.actual_alignment(),
876                alignment
877            );
878        }
879    }
880
881    #[test]
882    fn test_buffer_operations() {
883        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
884
885        // Test initial state
886        assert_eq!(buffer.len(), 0);
887        assert!(buffer.is_empty());
888        assert_eq!(buffer.capacity(), 1024);
889
890        // Test extend_from_slice
891        let data = b"Hello, SIMD World!";
892        buffer.extend_from_slice(data).unwrap();
893        assert_eq!(buffer.len(), data.len());
894        assert_eq!(buffer.as_slice(), data);
895
896        // Test clear
897        buffer.clear();
898        assert_eq!(buffer.len(), 0);
899        assert!(buffer.is_empty());
900        assert_eq!(buffer.capacity(), 1024); // Capacity should remain
901
902        // Test unsafe set_len
903        unsafe {
904            // Write some data directly
905            let slice = buffer.as_mut_capacity_slice();
906            slice[0..5].copy_from_slice(b"SIMD!");
907            buffer.set_len(5);
908        }
909        assert_eq!(buffer.len(), 5);
910        assert_eq!(&buffer.as_slice()[0..5], b"SIMD!");
911    }
912
913    #[test]
914    fn test_buffer_reserve() {
915        let mut buffer = AlignedBuffer::new(64, 32).unwrap();
916        let _initial_alignment = buffer.actual_alignment();
917
918        // Set some length first
919        unsafe {
920            buffer.set_len(32);
921        }
922
923        // Reserve additional space - should need capacity for len + additional
924        buffer.reserve(256).unwrap();
925        assert!(
926            buffer.capacity() >= 32 + 256,
927            "Expected capacity >= {}, got {}",
928            32 + 256,
929            buffer.capacity()
930        );
931
932        // Alignment should be preserved after reallocation
933        assert!(
934            buffer.actual_alignment() >= 32,
935            "Alignment not preserved after reserve"
936        );
937        assert!(buffer.is_aligned());
938
939        // Test that data is preserved during reallocation
940        buffer.extend_from_slice(b"test data").unwrap();
941        let old_data = buffer.as_slice().to_vec();
942
943        buffer.reserve(1024).unwrap();
944        assert_eq!(buffer.as_slice(), &old_data[..]);
945    }
946
947    #[test]
948    fn test_buffer_clone() {
949        let mut original = AlignedBuffer::new(512, 64).unwrap();
950        original.extend_from_slice(b"Original data").unwrap();
951
952        let cloned = original.clone();
953
954        // Verify clone has same properties
955        assert_eq!(cloned.len(), original.len());
956        assert_eq!(cloned.capacity(), original.capacity());
957        assert_eq!(cloned.alignment, original.alignment);
958        assert_eq!(cloned.as_slice(), original.as_slice());
959
960        // Verify clone has different memory location
961        assert_ne!(cloned.as_ptr(), original.as_ptr());
962
963        // Verify clone is also properly aligned
964        assert!(cloned.is_aligned());
965        assert!(cloned.actual_alignment() >= 64);
966    }
967
968    #[test]
969    fn test_alignment_validation() {
970        // Test valid power-of-2 alignments
971        let valid_alignments = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096];
972
973        for &alignment in &valid_alignments {
974            let result = AlignedBuffer::new(1024, alignment);
975            assert!(result.is_ok(), "Alignment {} should be valid", alignment);
976
977            let buffer = result.unwrap();
978            assert!(
979                buffer.is_aligned(),
980                "Buffer with alignment {} should be aligned",
981                alignment
982            );
983        }
984
985        // Test invalid non-power-of-2 alignments
986        let invalid_alignments = [3, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 17, 31, 33, 63, 65];
987
988        for &alignment in &invalid_alignments {
989            let result = AlignedBuffer::new(1024, alignment);
990            assert!(result.is_err(), "Alignment {} should be invalid", alignment);
991        }
992
993        // Test too large alignment
994        assert!(AlignedBuffer::new(1024, 8192).is_err());
995    }
996
997    #[test]
998    fn test_actual_alignment_calculation() {
999        // Create buffers with different alignments and verify actual_alignment()
1000        for &requested_align in &[16, 32, 64] {
1001            let buffer = AlignedBuffer::new(1024, requested_align).unwrap();
1002            let actual = buffer.actual_alignment();
1003
1004            assert!(
1005                actual >= requested_align,
1006                "Actual alignment {} is less than requested {}",
1007                actual,
1008                requested_align
1009            );
1010
1011            // actual_alignment should be a power of 2
1012            assert!(
1013                actual.is_power_of_two(),
1014                "Actual alignment {} is not a power of 2",
1015                actual
1016            );
1017        }
1018    }
1019
1020    #[test]
1021    fn test_simd_compatibility_check() {
1022        // SSE buffer should be compatible with SSE but might not be with AVX-512
1023        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1024        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1025        assert!(sse_buffer.is_simd_compatible(SimdType::Neon)); // Same alignment as SSE
1026
1027        // AVX-512 buffer should be compatible with all instruction sets
1028        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1029        assert!(avx512_buffer.is_simd_compatible(SimdType::Sse));
1030        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx2));
1031        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1032        assert!(avx512_buffer.is_simd_compatible(SimdType::Neon));
1033    }
1034
1035    #[test]
1036    fn test_zero_copy_verification() {
1037        let mut buffer = AlignedBuffer::new(1024, 32).unwrap();
1038
1039        // Get raw pointer before modification
1040        let ptr_before = buffer.as_ptr();
1041
1042        // Perform various operations that should NOT move the buffer
1043        buffer.clear();
1044        buffer.extend_from_slice(b"test").unwrap();
1045        unsafe {
1046            buffer.set_len(2);
1047        }
1048
1049        // Pointer should remain the same (zero-copy)
1050        assert_eq!(
1051            ptr_before,
1052            buffer.as_ptr(),
1053            "Buffer was moved during operations (not zero-copy)"
1054        );
1055
1056        // Only reserve should potentially change the pointer
1057        buffer.reserve(2048).unwrap();
1058        // After reserve, pointer might change but should still be aligned
1059        assert!(buffer.is_aligned());
1060    }
1061
1062    #[test]
1063    fn test_pool_cleanup() {
1064        let config = PoolConfig {
1065            buffer_ttl: Duration::from_millis(1),
1066            ..Default::default()
1067        };
1068        let pool = BufferPool::with_config(config);
1069
1070        // Allocate and drop buffer
1071        {
1072            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1073        }
1074
1075        // Wait for TTL
1076        std::thread::sleep(Duration::from_millis(10));
1077
1078        // Cleanup should free the buffer
1079        let cleanup_stats = pool.cleanup().unwrap();
1080        assert!(cleanup_stats.freed_buffers > 0);
1081    }
1082
1083    #[test]
1084    fn test_global_buffer_pool() {
1085        let pool = global_buffer_pool();
1086        let buffer = pool.get_buffer(BufferSize::Medium);
1087        assert!(buffer.is_ok());
1088    }
1089
1090    #[test]
1091    fn test_memory_limit_enforcement() {
1092        let config = PoolConfig {
1093            max_total_memory: 1024, // Very small limit
1094            max_buffers_per_bucket: 10,
1095            ..Default::default()
1096        };
1097        let pool = BufferPool::with_config(config);
1098
1099        // Create a buffer that exceeds the memory limit
1100        let result = pool.get_buffer(BufferSize::Medium); // 8KB > 1KB limit
1101
1102        assert!(result.is_err());
1103
1104        if let Err(e) = result {
1105            assert!(e.to_string().contains("memory limit"));
1106        }
1107    }
1108
1109    #[test]
1110    fn test_per_bucket_limit_enforcement() {
1111        let config = PoolConfig {
1112            max_buffers_per_bucket: 2,          // Very small limit
1113            max_total_memory: 10 * 1024 * 1024, // Generous memory limit
1114            ..Default::default()
1115        };
1116        let pool = BufferPool::with_config(config);
1117
1118        // Allocate and drop buffers to fill the bucket
1119        for _ in 0..3 {
1120            let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
1121            // Buffer goes back to pool on drop
1122        }
1123
1124        // Only 2 buffers should be retained in the pool
1125        let stats = pool.stats().unwrap();
1126        assert!(stats.cache_hits <= 2, "Too many buffers retained in bucket");
1127    }
1128
1129    #[test]
1130    fn test_buffer_size_validation() {
1131        let pool = BufferPool::new();
1132
1133        // All standard buffer sizes should be valid
1134        for size in BufferSize::all_sizes() {
1135            let result = pool.get_buffer(*size);
1136            assert!(result.is_ok(), "Buffer size {:?} should be valid", size);
1137        }
1138    }
1139
1140    #[test]
1141    fn test_memory_safety() {
1142        // Test that dropping a buffer properly deallocates memory
1143        // This test would fail under valgrind/ASAN if there's a memory leak
1144        for _ in 0..100 {
1145            let buffer = AlignedBuffer::new(1024, 64).unwrap();
1146            drop(buffer);
1147        }
1148
1149        // Test clone and drop
1150        for _ in 0..100 {
1151            let buffer = AlignedBuffer::new(512, 32).unwrap();
1152            let cloned = buffer.clone();
1153            drop(buffer);
1154            drop(cloned);
1155        }
1156    }
1157
1158    #[test]
1159    fn test_simd_specific_constructors() {
1160        // Test SSE alignment (16 bytes)
1161        let sse_buffer = AlignedBuffer::new_sse(1024).unwrap();
1162        assert!(sse_buffer.is_aligned());
1163        assert!(sse_buffer.is_simd_compatible(SimdType::Sse));
1164        assert_eq!(sse_buffer.alignment, 16);
1165
1166        // Test AVX2 alignment (32 bytes)
1167        let avx2_buffer = AlignedBuffer::new_avx2(1024).unwrap();
1168        assert!(avx2_buffer.is_aligned());
1169        assert!(avx2_buffer.is_simd_compatible(SimdType::Avx2));
1170        assert_eq!(avx2_buffer.alignment, 32);
1171
1172        // Test AVX-512 alignment (64 bytes)
1173        let avx512_buffer = AlignedBuffer::new_avx512(1024).unwrap();
1174        assert!(avx512_buffer.is_aligned());
1175        assert!(avx512_buffer.is_simd_compatible(SimdType::Avx512));
1176        assert_eq!(avx512_buffer.alignment, 64);
1177    }
1178
1179    #[test]
1180    fn test_simd_alignment_compatibility() {
1181        let buffer_64 = AlignedBuffer::new(1024, 64).unwrap();
1182
1183        // 64-byte aligned buffer should be compatible with all SIMD types
1184        assert!(buffer_64.is_simd_compatible(SimdType::Sse)); // 16-byte requirement
1185        assert!(buffer_64.is_simd_compatible(SimdType::Avx2)); // 32-byte requirement
1186        assert!(buffer_64.is_simd_compatible(SimdType::Avx512)); // 64-byte requirement
1187        assert!(buffer_64.is_simd_compatible(SimdType::Neon)); // 16-byte requirement
1188
1189        // Note: We can't easily test incompatible alignments since the allocator
1190        // might provide better alignment than requested for performance reasons.
1191        // Instead, test the requested alignment vs required alignment directly.
1192        #[allow(clippy::assertions_on_constants)]
1193        {
1194            assert!(64 >= 16); // SSE compatible
1195            assert!(64 >= 32); // AVX2 compatible
1196            assert!(64 >= 64); // AVX512 compatible
1197            assert!(64 >= 16); // NEON compatible
1198        }
1199
1200        let buffer_16 = AlignedBuffer::new(1024, 16).unwrap();
1201
1202        // Test that buffer reports correct requested alignment
1203        assert_eq!(buffer_16.alignment, 16);
1204
1205        // 16-byte aligned buffer should be compatible with SSE and NEON
1206        assert!(buffer_16.is_simd_compatible(SimdType::Sse));
1207        assert!(buffer_16.is_simd_compatible(SimdType::Neon));
1208
1209        // Note: actual_alignment() might be higher than 16 due to allocator behavior
1210        // so we can't reliably test incompatibility. Instead verify logic:
1211        #[allow(clippy::assertions_on_constants)]
1212        {
1213            assert!(16 >= 16); // SSE requirement met
1214            assert!(16 < 32); // AVX2 requirement NOT met by requested alignment
1215            assert!(16 < 64); // AVX512 requirement NOT met by requested alignment
1216        }
1217    }
1218
1219    #[test]
1220    fn test_actual_alignment_detection() {
1221        let buffer = AlignedBuffer::new(1024, 64).unwrap();
1222
1223        let actual_alignment = buffer.actual_alignment();
1224        assert!(
1225            actual_alignment >= 64,
1226            "Buffer has actual alignment of {}, expected at least 64",
1227            actual_alignment
1228        );
1229
1230        // The actual alignment should be a power of 2 and >= requested alignment
1231        assert!(actual_alignment.is_power_of_two());
1232        assert!(actual_alignment >= buffer.alignment);
1233    }
1234
1235    #[test]
1236    fn test_simd_pool_configuration() {
1237        // Test pool with high SIMD alignment requirement
1238        let config = PoolConfig {
1239            simd_alignment: 64, // AVX-512 alignment
1240            ..Default::default()
1241        };
1242        let pool = BufferPool::with_config(config);
1243
1244        let buffer = pool.get_buffer(BufferSize::Medium).unwrap();
1245        assert!(buffer.buffer().unwrap().is_aligned());
1246        assert!(
1247            buffer
1248                .buffer()
1249                .unwrap()
1250                .is_simd_compatible(SimdType::Avx512)
1251        );
1252    }
1253
1254    #[test]
1255    fn test_alignment_edge_cases() {
1256        // Test minimum alignment
1257        let buffer_min = AlignedBuffer::new(64, 1).unwrap();
1258        assert!(buffer_min.is_aligned());
1259        assert!(buffer_min.alignment >= mem::align_of::<usize>());
1260
1261        // Test power-of-2 validation
1262        assert!(AlignedBuffer::new(1024, 3).is_err());
1263        assert!(AlignedBuffer::new(1024, 17).is_err());
1264        assert!(AlignedBuffer::new(1024, 33).is_err());
1265
1266        // Test maximum alignment limit
1267        assert!(AlignedBuffer::new(1024, 8192).is_err());
1268    }
1269
1270    #[test]
1271    fn test_simd_performance_oriented_allocation() {
1272        // Test that allocation pattern is suitable for high-performance SIMD
1273        let buffer = AlignedBuffer::new_avx512(4096).unwrap();
1274
1275        // Verify the buffer can be used for actual SIMD-like operations
1276        let slice = unsafe { std::slice::from_raw_parts_mut(buffer.ptr.as_ptr(), buffer.capacity) };
1277
1278        // Fill with test pattern
1279        for (i, byte) in slice.iter_mut().enumerate() {
1280            *byte = (i % 256) as u8;
1281        }
1282
1283        // Verify alignment is maintained through operations
1284        assert!(buffer.is_aligned());
1285        assert_eq!(slice[0], 0);
1286        assert_eq!(slice[255], 255);
1287        assert_eq!(slice[256], 0);
1288    }
1289}