zeropool 0.3.1

High-performance buffer pool with constant-time allocation, thread-safe operations, and 5x speedup over bytes crate
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
use parking_lot::Mutex;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

use crate::EvictionPolicy;
use crate::config::PoolConfig;
use crate::tls::{SHARD_AFFINITY, TLS_CACHE, TLS_LIMIT};
use crate::utils::pin_buffer;

/// Metadata for buffer eviction policy
///
/// Uses a saturating access counter to track buffer "hotness"
/// without the overhead of timestamps or LRU lists.
#[derive(Debug)]
pub(crate) struct BufferEntry {
    pub(crate) buffer: Vec<u8>,
    /// Saturating counter (0-255) tracking recent accesses
    /// Higher values indicate "hotter" buffers that should be retained
    pub(crate) access_count: u8,
}

impl BufferEntry {
    #[inline]
    fn new(buffer: Vec<u8>) -> Self {
        Self {
            buffer,
            access_count: 1, // Start with 1 to indicate recent allocation
        }
    }

    /// Mark buffer as accessed (increment counter, saturating at 255)
    #[inline]
    fn mark_accessed(&mut self) {
        self.access_count = self.access_count.saturating_add(1);
    }

    /// Decay access count (used during eviction)
    #[inline]
    fn decay(&mut self) {
        self.access_count = self.access_count.saturating_sub(1);
    }

    #[inline]
    fn capacity(&self) -> usize {
        self.buffer.capacity()
    }
}

/// A single shard containing buffers and a local counter
///
/// Each shard has its own atomic counter to reduce contention
/// compared to a single shared counter across all shards.
#[derive(Debug)]
pub(crate) struct Shard {
    pub(crate) buffers: Mutex<Vec<BufferEntry>>,
    pub(crate) count: AtomicUsize,
}

impl Shard {
    fn new() -> Self {
        Self {
            buffers: Mutex::new(Vec::new()),
            count: AtomicUsize::new(0),
        }
    }
}

/// A high-performance, thread-safe buffer pool optimized for I/O workloads
///
/// # Performance Characteristics
///
/// - **O(1)** buffer allocation from pool (LIFO + first-fit fallback)
/// - **Lock-free** for single-threaded access via thread-local storage
/// - **Sharded** global pool to reduce multi-threaded contention
/// - **System-aware** defaults based on CPU count
/// - **Zero-copy** buffer reuse (no unnecessary allocations)
///
/// # Thread Safety
///
/// `BufferPool` is `Clone` and can be shared across threads. Each thread maintains
/// its own thread-local cache for maximum performance, with a sharded global pool
/// as fallback to minimize lock contention.
///
/// # Architecture
///
/// ```text
/// Thread 1                Thread 2                Thread N
/// ┌──────────┐           ┌──────────┐           ┌──────────┐
/// │TLS Cache │           │TLS Cache │           │TLS Cache │
/// │(2-8 bufs)│           │TLS Cache │           │TLS Cache │
/// └────┬─────┘           └────┬─────┘           └────┬─────┘
///      │(affinity)            │(affinity)            │(affinity)
///      │                      │                      │
///      └──────────┬───────────┴──────────────────────┘
//////         ┌───────▼────────┐
///         │  Shared Pool   │
///         │  (N shards)    │  Thread-local affinity:
///         │                │  Each thread uses same
///         │ [Shard 0] ...  │  shard for cache locality
///         │ [Shard 1]      │
///         │ [Shard 2]      │  N = next_pow2(num_cpus/2)
///         │    ...         │  clamped to [4, 128]
///         └────────────────┘
/// ```
#[derive(Clone, Debug)]
pub struct BufferPool {
    pub(crate) shards: Arc<Vec<Shard>>, // Changed to use Shard struct
    pub(crate) config: PoolConfig,
    /// Cached mask for fast shard selection (num_shards - 1)
    shard_mask: usize,
    // Removed: total_buffers - replaced with per-shard counters
}

impl BufferPool {
    /// Create a new buffer pool with system-aware defaults
    ///
    /// The pool automatically configures itself based on CPU count:
    /// - Number of shards (4-128, reduces lock contention)
    /// - Thread-local cache size (2-8 buffers per thread)
    /// - Max buffers per shard (16-64)
    /// - 1MB minimum buffer size
    ///
    /// # Example
    /// ```
    /// use zeropool::BufferPool;
    ///
    /// let pool = BufferPool::new();
    /// let buffer = pool.get(1024 * 1024);
    /// // ... use buffer ...
    /// // Buffer automatically returned when it goes out of scope
    /// ```
    #[inline]
    pub fn new() -> Self {
        crate::Builder::default().build()
    }

    /// Create a builder for custom configuration
    ///
    /// Use this when you need to customize the pool behavior.
    ///
    /// # Example
    /// ```
    /// use zeropool::BufferPool;
    ///
    /// let pool = BufferPool::builder()
    ///     .min_buffer_size(512 * 1024)  // 512KB minimum
    ///     .num_shards(16)
    ///     .build();
    /// ```
    #[inline]
    pub fn builder() -> crate::Builder {
        crate::Builder::default()
    }

    /// Create a pool with explicit configuration (internal use)
    pub(crate) fn with_config(config: PoolConfig) -> Self {
        let shards: Vec<Shard> = (0..config.num_shards).map(|_| Shard::new()).collect();

        // Initialize TLS limit for this thread
        TLS_LIMIT.with(|limit| {
            limit.set(config.tls_cache_size);
        });

        Self {
            shards: Arc::new(shards),
            shard_mask: config.num_shards - 1,
            config,
        }
    }

    /// Get the shard index for the current thread using thread-local affinity
    ///
    /// Each thread gets a consistent shard assignment based on its thread ID hash.
    /// This improves cache locality by having each thread reuse the same shard's
    /// cached data structures.
    #[inline(always)]
    fn get_shard_index(&self) -> usize {
        SHARD_AFFINITY.with(|affinity| {
            if let Some(idx) = affinity.get() {
                // Fast path: affinity already computed
                // Validate that cached affinity is valid for this pool's shard count
                if idx < self.shards.len() {
                    return idx;
                }
                // Cached affinity is stale (from a pool with more shards), recompute
            }
            // Cold path: compute affinity once per thread
            let mut hasher = std::collections::hash_map::DefaultHasher::new();
            thread::current().id().hash(&mut hasher);
            let idx = (hasher.finish() as usize) & self.shard_mask;
            affinity.set(Some(idx));
            idx
        })
    }

    /// Get a buffer of at least the specified size from the pool
    ///
    /// Returns a `PooledBuffer` that automatically returns to the pool when dropped.
    /// The buffer may have larger capacity than requested for reuse efficiency.
    ///
    /// # Performance
    ///
    /// 1. **Fastest**: Thread-local cache LIFO (lock-free, cache-hot)
    /// 2. **Fast**: Shared pool shard LIFO + first-fit fallback
    /// 3. **Fallback**: New allocation
    ///
    /// # Examples
    ///
    /// ```
    /// use zeropool::BufferPool;
    ///
    /// let pool = BufferPool::new();
    /// {
    ///     let mut buffer = pool.get(1024);
    ///     buffer[0] = 42;
    ///     // Buffer automatically returned here
    /// }
    /// ```
    ///
    /// # Panics
    ///
    /// This method may panic if internal invariants are violated (buffer found but cannot be removed).
    /// In practice, this should never occur under normal usage.
    #[inline]
    #[must_use]
    pub fn get(&self, size: usize) -> crate::buffer::PooledBuffer {
        // Fastest path: thread-local cache (lock-free)
        let tls_hit = TLS_CACHE.with(|tls| {
            let mut cache = tls.borrow_mut();

            // LIFO: Check most recently used buffer first (better cache locality)
            if let Some(entry) = cache.buffers.last()
                && entry.capacity() >= size
            {
                let mut entry = cache.buffers.pop().unwrap();
                entry.mark_accessed(); // NEW: Track access
                let mut buf = entry.buffer;
                // Resize buffer to requested size, without zeroing
                // SAFETY: Buffer was previously allocated with at least this capacity
                unsafe {
                    buf.set_len(size);
                }
                return Some(buf);
            }

            // Fallback: First-fit search for compatible buffer
            if let Some(idx) = cache.buffers.iter().position(|e| e.capacity() >= size) {
                let mut entry = cache.buffers.swap_remove(idx);
                entry.mark_accessed(); // NEW: Track access
                let mut buf = entry.buffer;
                // Resize buffer to requested size, without zeroing
                // SAFETY: Buffer was previously allocated with at least this capacity
                unsafe {
                    buf.set_len(size);
                }
                return Some(buf);
            }
            None
        });

        if let Some(buf) = tls_hit {
            return crate::buffer::PooledBuffer::new(buf, self.clone());
        }

        // Fast path: try shared pool (sharded to reduce contention)
        let shard_idx = self.get_shard_index();
        // SAFETY: get_shard_index() uses bitmask (shard_mask = num_shards - 1) where
        // num_shards is power of 2, guaranteeing shard_idx < shards.len()
        debug_assert!(shard_idx < self.shards.len(), "Shard index out of bounds");
        let shard = &self.shards[shard_idx];
        let mut buffers = shard.buffers.lock();

        // LIFO: Try most recently returned buffer first (cache-hot)
        if let Some(entry) = buffers.last()
            && entry.capacity() >= size
        {
            let mut entry = buffers.pop().unwrap();
            shard.count.fetch_sub(1, Ordering::Relaxed);
            entry.mark_accessed(); // NEW: Track access
            let mut buffer = entry.buffer;
            // Resize buffer to requested size, without zeroing
            // SAFETY: Buffer was previously allocated with at least this capacity
            unsafe {
                buffer.set_len(size);
            }
            return crate::buffer::PooledBuffer::new(buffer, self.clone());
        }

        // First-fit fallback: scan for compatible buffer
        let vec = if let Some(idx) = buffers.iter().position(|e| e.capacity() >= size) {
            let mut entry = buffers.swap_remove(idx);
            shard.count.fetch_sub(1, Ordering::Relaxed);
            entry.mark_accessed(); // NEW: Track access
            let mut buffer = entry.buffer;
            // Resize buffer to requested size, without zeroing
            // SAFETY: Buffer was previously allocated with at least this capacity
            unsafe {
                buffer.set_len(size);
            }
            buffer
        } else {
            drop(buffers);
            let mut v = Vec::with_capacity(size);
            // SAFETY: Setting length on capacity-allocated vector creates uninitialized but valid memory
            #[allow(clippy::uninit_vec)]
            unsafe {
                v.set_len(size);
            }
            v
        };

        crate::buffer::PooledBuffer::new(vec, self.clone())
    }

    /// Return a buffer to the pool for reuse (internal use only)
    ///
    /// This method is called automatically by `PooledBuffer::drop()`.
    /// Users should not call this directly.
    ///
    /// The buffer is cleared (but not zeroed for performance) to prepare for reuse.
    /// Capacity is preserved. Small buffers (below `min_size`)
    /// are automatically discarded.
    #[inline]
    pub(crate) fn put(&self, mut buffer: Vec<u8>) {
        // Clear length but keep capacity (no zeroing for performance)
        buffer.clear();

        // Initialize TLS limit on first use (cold path, happens once per thread)
        let limit = TLS_LIMIT.with(|limit| {
            let current = limit.get();
            if current == 0 {
                limit.set(self.config.tls_cache_size);
                self.config.tls_cache_size
            } else {
                current
            }
        });

        // Fastest path: return to thread-local cache (lock-free)
        // Returns the buffer if not stored
        let buffer = TLS_CACHE.with(|tls| {
            let mut cache = tls.borrow_mut();

            if cache.buffers.len() < limit {
                cache.buffers.push(BufferEntry::new(buffer)); // Wrap in entry
                None // Successfully stored in TLS
            } else {
                Some(buffer) // TLS full, return buffer
            }
        });

        // Fast path succeeded
        let Some(buffer) = buffer else {
            return;
        };

        // Fall back to shared pool (sharded to reduce contention)
        if buffer.capacity() < self.config.min_buffer_size {
            return; // Too small, discard
        }

        // Pin buffer memory if enabled
        if self.config.pinned_memory {
            pin_buffer(&buffer);
        }

        let shard_idx = self.get_shard_index();
        // SAFETY: shard_idx guaranteed in bounds by bitmask operation in get_shard_index()
        debug_assert!(shard_idx < self.shards.len(), "Shard index out of bounds in put");
        let shard = &self.shards[shard_idx];
        let mut buffers = shard.buffers.lock();

        // Limit pool size per shard to prevent unbounded growth
        if buffers.len() < self.config.max_buffers_per_shard {
            buffers.push(BufferEntry::new(buffer)); // Wrap in entry
            shard.count.fetch_add(1, Ordering::Relaxed);
        } else {
            // NEW: Eviction when shard full
            if self.config.eviction_policy == EvictionPolicy::ClockPro {
                Self::evict_and_insert(&mut buffers, buffer);
            }
            // Otherwise drop buffer (current behavior)
        }
        // Lock released automatically
    }

    /// Evict cold buffer and insert new one using CLOCK-Pro algorithm
    fn evict_and_insert(shard: &mut [BufferEntry], new_buffer: Vec<u8>) {
        // Safety check: shard must not be empty
        if shard.is_empty() {
            return;
        }

        // Find buffer with lowest access count
        let mut min_idx = 0;
        let mut min_count = u8::MAX;

        for (idx, entry) in shard.iter_mut().enumerate() {
            entry.decay(); // Decay all counters
            if entry.access_count < min_count {
                min_count = entry.access_count;
                min_idx = idx;
            }
        }

        // Replace coldest buffer with new one (always evict something)
        shard[min_idx] = BufferEntry::new(new_buffer);
        // Note: We don't increment total_buffers since we're replacing, not adding
    }

    /// Pre-allocate buffers in the pool
    ///
    /// Useful for warming up the pool before high-throughput operations.
    /// Distributes buffers evenly across all shards.
    /// Optimized to allocate all buffers first, then distribute with minimal locking.
    pub fn preallocate(&self, count: usize, size: usize) {
        let per_shard = count.div_ceil(self.config.num_shards);
        let total_to_allocate = per_shard * self.config.num_shards;

        // Allocate all buffers at once
        let mut all_buffers: Vec<BufferEntry> = Vec::with_capacity(total_to_allocate);
        for _ in 0..total_to_allocate {
            let mut buf = Vec::with_capacity(size.max(self.config.min_buffer_size));

            // Pin buffer memory if enabled
            if self.config.pinned_memory {
                // Need to set length for pinning, without zeroing
                // SAFETY: Buffer was allocated with capacity, setting length creates uninitialized memory
                unsafe {
                    buf.set_len(buf.capacity());
                }
                pin_buffer(&buf);
                buf.clear();
            }

            all_buffers.push(BufferEntry::new(buf)); // Wrap in entry
        }

        // Distribute buffers across shards with single lock per shard
        // Process shards in reverse order to avoid index shifting during drain
        for shard_idx in (0..self.config.num_shards).rev() {
            // SAFETY: shard_idx < num_shards by loop bounds
            debug_assert!(
                shard_idx < self.shards.len(),
                "Shard index out of bounds in preallocate"
            );
            let shard = &self.shards[shard_idx];
            let mut buffers = shard.buffers.lock();
            buffers.reserve(per_shard);

            let start = shard_idx * per_shard;
            let end = start + per_shard;

            // Move buffers directly without cloning
            for entry in all_buffers.drain(start..end) {
                buffers.push(entry);
            }

            shard.count.fetch_add(per_shard, Ordering::Relaxed);
        }
    }

    /// Get the total number of buffers currently in all shards
    ///
    /// Note: Does not include thread-local cached buffers.
    /// This is now O(N) where N = num_shards, using per-shard counters.
    #[inline]
    #[must_use]
    pub fn len(&self) -> usize {
        self.shards.iter().map(|s| s.count.load(Ordering::Relaxed)).sum()
    }

    /// Check if all shards are empty
    ///
    /// Note: Does not check thread-local caches.
    /// This is now O(N) where N = num_shards, using per-shard counters.
    #[inline]
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.shards.iter().all(|s| s.count.load(Ordering::Relaxed) == 0)
    }

    /// Clears all buffers from the shared pool
    ///
    /// Removes all buffers from all shards, releasing memory back to the OS.
    /// Thread-local caches are NOT cleared (they're thread-private).
    ///
    /// # Examples
    /// ```
    /// use zeropool::BufferPool;
    ///
    /// let pool = BufferPool::builder().min_buffer_size(0).build();
    ///
    /// // Get and return some buffers to the pool
    /// {
    ///     let _buf1 = pool.get(1024);
    ///     let _buf2 = pool.get(2048);
    ///     // Buffers automatically returned when dropped
    /// }
    ///
    /// // Clear all buffers from the shared pool
    /// pool.clear();
    ///
    /// // Pool is now empty (thread-local caches are not affected)
    /// ```
    ///
    /// # Thread Safety
    /// This method is safe to call concurrently. Each shard is locked
    /// individually, so other threads can continue using the pool.
    ///
    /// # Performance
    /// O(num_shards) with brief lock contention per shard
    pub fn clear(&self) {
        for shard in self.shards.iter() {
            let mut buffers = shard.buffers.lock();
            let count = buffers.len();
            buffers.clear();
            shard.count.fetch_sub(count, Ordering::Relaxed);
        }
    }

    /// Shrinks the capacity of all shards to fit their current contents
    ///
    /// This can reduce memory usage if the pool previously held many buffers
    /// but now holds few. Similar to `Vec::shrink_to_fit()`.
    ///
    /// # Performance
    /// O(num_shards) with brief lock contention per shard
    pub fn shrink_to_fit(&self) {
        for shard in self.shards.iter() {
            shard.buffers.lock().shrink_to_fit();
        }
    }
}

impl Default for BufferPool {
    fn default() -> Self {
        Self::new()
    }
}