Skip to main content

nntp_proxy/pool/
buffer.rs

1use crate::types::BufferSize;
2use crossbeam::queue::SegQueue;
3use std::ops::Deref;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use tracing::info;
7
8/// A pooled buffer that automatically returns to the pool when dropped
9///
10/// # Safety and Uninitialized Memory
11///
12/// Buffers contain **uninitialized memory** for performance (no zeroing overhead).
13/// The type tracks initialized bytes and only exposes that portion, preventing UB.
14///
15/// ## Usage
16/// ```ignore
17/// let mut buffer = pool.acquire().await;
18/// let n = buffer.read_from(&mut stream).await?;  // Automatic tracking
19/// process(&*buffer);  // Deref returns only &buffer[..n]
20/// ```
21pub struct PooledBuffer {
22    buffer: Vec<u8>,
23    initialized: usize,
24    pool: Arc<SegQueue<Vec<u8>>>,
25    pool_size: Arc<AtomicUsize>,
26    max_pool_size: usize,
27}
28
29impl PooledBuffer {
30    /// Get the full capacity of the buffer
31    #[must_use]
32    #[inline]
33    pub fn capacity(&self) -> usize {
34        self.buffer.len()
35    }
36
37    /// Get the number of initialized bytes
38    #[must_use]
39    #[inline]
40    pub fn initialized(&self) -> usize {
41        self.initialized
42    }
43
44    /// Read from an AsyncRead source, automatically tracking initialized bytes
45    pub async fn read_from<R>(&mut self, reader: &mut R) -> std::io::Result<usize>
46    where
47        R: tokio::io::AsyncReadExt + Unpin,
48    {
49        let n = reader.read(&mut self.buffer[..]).await?;
50        self.initialized = n;
51        Ok(n)
52    }
53
54    /// Copy data into buffer and mark as initialized
55    ///
56    /// # Panics
57    /// Panics if data.len() > capacity
58    #[inline]
59    pub fn copy_from_slice(&mut self, data: &[u8]) {
60        assert!(
61            data.len() <= self.buffer.len(),
62            "data exceeds buffer capacity"
63        );
64        self.buffer[..data.len()].copy_from_slice(data);
65        self.initialized = data.len();
66    }
67
68    /// Get mutable access to the full buffer capacity
69    ///
70    /// Returns a mutable slice of the entire buffer. After writing to this slice,
71    /// you must manually track how many bytes were initialized (e.g., using the
72    /// return value from `read()` and accessing via `&buffer[..n]`).
73    ///
74    /// # Safety Note
75    /// The returned slice contains **uninitialized memory**. Only access bytes
76    /// that you've written to. This method is primarily for use with I/O
77    /// operations like `AsyncRead::read()`.
78    ///
79    /// # Examples
80    /// ```ignore
81    /// let mut buffer = pool.acquire().await;
82    /// let n = stream.read(buffer.as_mut_slice()).await?;
83    /// let initialized_data = &buffer.as_mut_slice()[..n];
84    /// ```
85    #[must_use]
86    #[inline]
87    pub fn as_mut_slice(&mut self) -> &mut [u8] {
88        &mut self.buffer[..]
89    }
90}
91
92impl Deref for PooledBuffer {
93    type Target = [u8];
94
95    #[inline]
96    fn deref(&self) -> &Self::Target {
97        // Immutable access only returns the initialized portion
98        &self.buffer[..self.initialized]
99    }
100}
101
102// Intentionally NO DerefMut or AsMut - forces explicit use of read_from() or as_mut_slice()
103
104impl AsRef<[u8]> for PooledBuffer {
105    #[inline]
106    fn as_ref(&self) -> &[u8] {
107        // Only return initialized portion
108        &self.buffer[..self.initialized]
109    }
110}
111
112impl Drop for PooledBuffer {
113    fn drop(&mut self) {
114        // Atomically return buffer to pool if pool is not full
115        let mut current_size = self.pool_size.load(Ordering::Relaxed);
116        while current_size < self.max_pool_size {
117            match self.pool_size.compare_exchange_weak(
118                current_size,
119                current_size + 1,
120                Ordering::Relaxed,
121                Ordering::Relaxed,
122            ) {
123                Ok(_) => {
124                    let buffer = std::mem::take(&mut self.buffer);
125                    self.pool.push(buffer);
126                    return;
127                }
128                Err(new_size) => {
129                    current_size = new_size;
130                }
131            }
132        }
133        // If pool is full, buffer is dropped
134    }
135}
136
137/// Lock-free buffer pool for reusing large I/O buffers
138/// Uses crossbeam's SegQueue for lock-free operations
139#[derive(Debug, Clone)]
140pub struct BufferPool {
141    pool: Arc<SegQueue<Vec<u8>>>,
142    buffer_size: BufferSize,
143    max_pool_size: usize,
144    pool_size: Arc<AtomicUsize>,
145}
146
147impl BufferPool {
148    /// Create a page-aligned buffer for optimal DMA performance
149    ///
150    /// Returns a raw Vec<u8> that will be wrapped in PooledBuffer by acquire().
151    /// The buffer is NOT zero-initialized for performance.
152    ///
153    /// # Safety
154    ///
155    /// **INTERNAL USE ONLY.** This function is not exposed publicly and is only used
156    /// within the buffer pool implementation where the safety contract is guaranteed.
157    ///
158    /// The returned buffer contains uninitialized memory. Callers must ensure that the buffer
159    /// is only used with `AsyncRead`/`AsyncWrite` operations that fully initialize the bytes
160    /// before they are read. Only the initialized portion of the buffer (`&buf[..n]`, where `n`
161    /// is the number of bytes read or written) may be accessed. Accessing uninitialized bytes
162    /// is undefined behavior.
163    ///
164    /// The public API (`acquire()`) returns a `PooledBuffer` which is a safe wrapper that
165    /// enforces this contract through the type system and usage patterns.
166    #[allow(clippy::uninit_vec)]
167    fn create_aligned_buffer(size: usize) -> Vec<u8> {
168        // Align to page boundaries (4KB) for better memory performance
169        let page_size = 4096;
170        let aligned_size = size.div_ceil(page_size) * page_size;
171
172        // Use aligned allocation for better cache performance
173        let mut buffer = Vec::with_capacity(aligned_size);
174        // SAFETY: We're setting the length without initializing the data.
175        // This is safe because:
176        // 1. The Vec is wrapped in PooledBuffer which derefs to &[u8]
177        // 2. PooledBuffer is immediately used with AsyncRead which writes into it
178        // 3. Callers only access &buffer[..n] where n is the bytes actually read
179        // 4. Unwritten/uninitialized bytes are never accessed
180        unsafe {
181            buffer.set_len(size);
182        }
183        buffer
184    }
185
186    /// Create a new buffer pool with pre-allocated buffers
187    ///
188    /// # Arguments
189    /// * `buffer_size` - Size of each buffer in bytes (must be non-zero)
190    /// * `max_pool_size` - Maximum number of buffers to pool
191    ///
192    /// # Design Philosophy
193    ///
194    /// **Buffer pools are allocated ONCE at application boot**, providing a
195    /// zero-allocation hot path for I/O operations. This is a critical performance
196    /// optimization:
197    ///
198    /// - **Boot time**: All buffers pre-allocated (one-time cost)
199    /// - **Runtime**: Zero allocations in hot path (acquire/release from pool)
200    /// - **Per-connection**: Buffers are borrowed and returned, never owned
201    ///
202    /// **IMPORTANT**: Do NOT create a BufferPool per-client, per-connection, or
203    /// per-request. Create ONE pool at application startup and share it across
204    /// all operations via Arc or static reference.
205    ///
206    /// All buffers are pre-allocated at creation time for optimal performance.
207    #[must_use]
208    pub fn new(buffer_size: BufferSize, max_pool_size: usize) -> Self {
209        let pool = Arc::new(SegQueue::new());
210        let pool_size = Arc::new(AtomicUsize::new(0));
211
212        // Pre-allocate all buffers at startup to eliminate allocation overhead
213        info!(
214            "Pre-allocating {} buffers of {}KB each ({}MB total)",
215            max_pool_size,
216            buffer_size.get() / 1024,
217            (max_pool_size * buffer_size.get()) / (1024 * 1024)
218        );
219
220        for _ in 0..max_pool_size {
221            let buffer = Self::create_aligned_buffer(buffer_size.get());
222            pool.push(buffer);
223            pool_size.fetch_add(1, Ordering::Relaxed);
224        }
225
226        info!("Buffer pool pre-allocation complete");
227
228        Self {
229            pool,
230            buffer_size,
231            max_pool_size,
232            pool_size,
233        }
234    }
235
236    /// Create a buffer pool suitable for testing
237    ///
238    /// Uses sensible defaults (8KB buffers, pool of 4) that work for most tests.
239    /// Prefer this over manually constructing BufferPool in tests.
240    #[cfg(test)]
241    #[must_use]
242    pub fn for_tests() -> Self {
243        Self::new(BufferSize::try_new(8192).expect("valid size"), 4)
244    }
245
246    /// Get a buffer from the pool or create a new one (lock-free)
247    ///
248    /// Returns a PooledBuffer that automatically returns to the pool when dropped.
249    ///
250    /// # Performance: Zero-Allocation Hot Path
251    ///
252    /// When the pool was created with sufficient `max_pool_size`, this method
253    /// performs ZERO allocations - it just pops from the lock-free queue.
254    /// This is the critical hot path for I/O operations.
255    ///
256    /// Only if the pool is exhausted (all buffers in use) will a new buffer
257    /// be allocated. Size the pool appropriately to avoid this fallback.
258    ///
259    /// # Safety Notes
260    ///
261    /// The buffer may contain old data, but this is safe because:
262    /// - Callers use AsyncRead which writes into the buffer
263    /// - They get back `n` bytes written and access only `&buf[..n]`
264    /// - Stale data beyond `n` is never accessed
265    pub async fn acquire(&self) -> PooledBuffer {
266        let buffer = if let Some(buffer) = self.pool.pop() {
267            self.pool_size.fetch_sub(1, Ordering::Relaxed);
268            // Buffer from pool is already the correct size (enforced on return)
269            debug_assert_eq!(buffer.len(), self.buffer_size.get());
270            buffer
271        } else {
272            // Create new page-aligned buffer for better DMA performance
273            Self::create_aligned_buffer(self.buffer_size.get())
274        };
275
276        PooledBuffer {
277            buffer,
278            initialized: 0, // Start with 0 bytes safe to read
279            pool: Arc::clone(&self.pool),
280            pool_size: Arc::clone(&self.pool_size),
281            max_pool_size: self.max_pool_size,
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[tokio::test]
291    async fn test_buffer_pool_creation() {
292        let pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 10);
293
294        // Pool should pre-allocate buffers
295        let buffer1 = pool.acquire().await;
296        assert_eq!(buffer1.capacity(), 8192);
297        assert_eq!(buffer1.initialized(), 0); // No bytes initialized yet
298        // Buffer automatically returned on drop
299    }
300
301    #[tokio::test]
302    async fn test_buffer_pool_get_and_return() {
303        let pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 5);
304
305        // Get a buffer
306        let buffer = pool.acquire().await;
307        assert_eq!(buffer.capacity(), 4096);
308        assert_eq!(buffer.initialized(), 0);
309
310        // Buffer contains uninitialized data - this is intentional for performance
311        // Callers will write to it via AsyncRead before accessing
312
313        // Drop it (automatically returns to pool)
314        drop(buffer);
315
316        // Get it again - should be from pool
317        let buffer2 = pool.acquire().await;
318        assert_eq!(buffer2.capacity(), 4096);
319    }
320
321    #[tokio::test]
322    async fn test_buffer_pool_exhaustion() {
323        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 2);
324
325        // Get all pre-allocated buffers
326        let buf1 = pool.acquire().await;
327        let buf2 = pool.acquire().await;
328
329        // Pool is exhausted, should create new buffer
330        let buf3 = pool.acquire().await;
331        assert_eq!(buf3.capacity(), 1024);
332
333        // Drop buffers (automatically returned)
334        drop(buf1);
335        drop(buf2);
336        drop(buf3);
337    }
338
339    #[tokio::test]
340    async fn test_buffer_pool_concurrent_access() {
341        let pool = BufferPool::new(BufferSize::try_new(2048).unwrap(), 10);
342
343        // Spawn multiple tasks accessing the pool concurrently
344        let mut handles = vec![];
345
346        for _ in 0..20 {
347            let pool_clone = pool.clone();
348            let handle = tokio::spawn(async move {
349                let buffer = pool_clone.acquire().await;
350                assert_eq!(buffer.capacity(), 2048);
351                // Simulate some work
352                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
353            });
354            handles.push(handle);
355        }
356
357        // Wait for all tasks to complete
358        for handle in handles {
359            handle.await.unwrap();
360        }
361    }
362
363    #[tokio::test]
364    async fn test_buffer_alignment() {
365        let pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 1);
366        let buffer = pool.acquire().await;
367
368        // Buffer capacity should be aligned to page boundaries (4KB)
369        assert!(buffer.capacity() >= 8192);
370        // Should be page-aligned (multiple of 4096)
371        assert_eq!(buffer.capacity() % 4096, 0);
372    }
373
374    #[tokio::test]
375    async fn test_buffer_clear_and_resize() {
376        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 2);
377
378        let mut buffer = pool.acquire().await;
379
380        // Write data using copy_from_slice
381        let data = vec![42u8; 101];
382        buffer.copy_from_slice(&data);
383        assert_eq!(buffer.initialized(), 101);
384
385        // Drop returns it to pool
386        drop(buffer);
387
388        // Get it again - may contain old data (performance optimization)
389        let buffer2 = pool.acquire().await;
390        assert_eq!(buffer2.capacity(), 1024);
391        // Note: buffer may contain previous data - callers must use &buf[..n] pattern
392    }
393
394    #[tokio::test]
395    async fn test_buffer_pool_max_size_enforcement() {
396        let pool = BufferPool::new(BufferSize::try_new(512).unwrap(), 3);
397
398        // Get all buffers
399        let buf1 = pool.acquire().await;
400        let buf2 = pool.acquire().await;
401        let buf3 = pool.acquire().await;
402
403        // Get one more (should create new)
404        let buf4 = pool.acquire().await;
405
406        // Drop all buffers (automatically returned)
407        drop(buf1);
408        drop(buf2);
409        drop(buf3);
410        drop(buf4);
411
412        // Pool should not exceed max size
413        // (We can't directly test pool size, but the implementation handles it)
414    }
415
416    #[tokio::test]
417    async fn test_buffer_wrong_size_not_returned() {
418        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 2);
419
420        let buffer = pool.acquire().await;
421        assert_eq!(buffer.capacity(), 1024);
422
423        // PooledBuffer auto-returns on drop with correct size enforcement in Drop impl
424        drop(buffer);
425    }
426
427    #[tokio::test]
428    async fn test_buffer_pool_multiple_get_return_cycles() {
429        let pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 5);
430
431        // Do multiple get/return cycles
432        for i in 0..20 {
433            let mut buffer = pool.acquire().await;
434            assert_eq!(buffer.capacity(), 4096);
435
436            // Write some data using copy_from_slice
437            let data = vec![i as u8; 1];
438            buffer.copy_from_slice(&data);
439            assert_eq!(buffer.initialized(), 1);
440        }
441    }
442
443    #[test]
444    fn test_buffer_pool_clone() {
445        let pool1 = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
446        let _pool2 = pool1.clone();
447
448        // Both should share the same underlying pool
449        // (Arc ensures shared ownership)
450    }
451
452    #[tokio::test]
453    async fn test_different_buffer_sizes() {
454        let small_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
455        let medium_pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 5);
456        let large_pool = BufferPool::new(BufferSize::try_new(65536).unwrap(), 5);
457
458        let small_buf = small_pool.acquire().await;
459        let medium_buf = medium_pool.acquire().await;
460        let large_buf = large_pool.acquire().await;
461
462        assert_eq!(small_buf.capacity(), 1024);
463        assert_eq!(medium_buf.capacity(), 8192);
464        assert_eq!(large_buf.capacity(), 65536);
465
466        // Buffers auto-return on drop
467    }
468
469    #[tokio::test]
470    async fn test_as_mut_slice() {
471        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
472        let mut buffer = pool.acquire().await;
473
474        // Get mutable slice and write to it
475        let slice = buffer.as_mut_slice();
476        assert_eq!(slice.len(), 1024);
477
478        // Write some data manually
479        slice[0] = b'H';
480        slice[1] = b'i';
481        slice[2] = b'!';
482
483        // Can write to the full capacity
484        for (i, byte) in slice.iter_mut().enumerate() {
485            *byte = (i % 256) as u8;
486        }
487
488        // Note: initialized() doesn't track manual writes via as_mut_slice
489        // This is intentional - as_mut_slice is for low-level I/O
490        assert_eq!(buffer.initialized(), 0);
491    }
492
493    #[tokio::test]
494    async fn test_buffer_pool_stress() {
495        let pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 10);
496
497        // Stress test with many concurrent operations
498        let mut handles = vec![];
499
500        for _ in 0..100 {
501            let pool_clone = pool.clone();
502            let handle = tokio::spawn(async move {
503                for _ in 0..10 {
504                    let buffer = pool_clone.acquire().await;
505                    assert_eq!(buffer.capacity(), 4096);
506                }
507            });
508            handles.push(handle);
509        }
510
511        for handle in handles {
512            handle.await.unwrap();
513        }
514    }
515
516    #[tokio::test]
517    async fn test_pooled_buffer_deref() {
518        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
519        let mut buffer = pool.acquire().await;
520
521        // Initially no initialized bytes
522        assert_eq!(buffer.len(), 0);
523
524        // Write data
525        buffer.copy_from_slice(b"Hello");
526
527        // Deref should return only initialized portion
528        assert_eq!(buffer.len(), 5);
529        assert_eq!(&*buffer, b"Hello");
530    }
531
532    #[tokio::test]
533    async fn test_pooled_buffer_as_ref() {
534        let pool = BufferPool::new(BufferSize::try_new(512).unwrap(), 5);
535        let mut buffer = pool.acquire().await;
536
537        buffer.copy_from_slice(b"Test data");
538
539        // AsRef should return initialized portion
540        let slice: &[u8] = buffer.as_ref();
541        assert_eq!(slice, b"Test data");
542        assert_eq!(slice.len(), 9);
543    }
544
545    #[tokio::test]
546    async fn test_copy_from_slice_updates_initialized() {
547        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
548        let mut buffer = pool.acquire().await;
549
550        assert_eq!(buffer.initialized(), 0);
551
552        buffer.copy_from_slice(b"abc");
553        assert_eq!(buffer.initialized(), 3);
554
555        buffer.copy_from_slice(b"longer text");
556        assert_eq!(buffer.initialized(), 11);
557    }
558
559    #[tokio::test]
560    #[should_panic(expected = "data exceeds buffer capacity")]
561    async fn test_copy_from_slice_panic_on_overflow() {
562        let pool = BufferPool::new(BufferSize::try_new(10).unwrap(), 5);
563        let mut buffer = pool.acquire().await;
564
565        let too_large = vec![0u8; 20];
566        buffer.copy_from_slice(&too_large); // Should panic
567    }
568
569    #[tokio::test]
570    async fn test_buffer_pool_debug() {
571        let pool = BufferPool::new(BufferSize::try_new(2048).unwrap(), 5);
572        let debug_str = format!("{:?}", pool);
573        assert!(debug_str.contains("BufferPool"));
574    }
575
576    #[tokio::test]
577    async fn test_buffer_initialized_tracking() {
578        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
579        let mut buffer = pool.acquire().await;
580
581        // Test multiple writes update initialized correctly
582        buffer.copy_from_slice(b"First");
583        assert_eq!(buffer.initialized(), 5);
584        assert_eq!(&*buffer, b"First");
585
586        buffer.copy_from_slice(b"Second write");
587        assert_eq!(buffer.initialized(), 12);
588        assert_eq!(&*buffer, b"Second write");
589    }
590
591    #[tokio::test]
592    async fn test_buffer_capacity_vs_initialized() {
593        let pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 5);
594        let mut buffer = pool.acquire().await;
595
596        // Capacity is full buffer size
597        assert_eq!(buffer.capacity(), 8192);
598
599        // Initialized is what we've written
600        assert_eq!(buffer.initialized(), 0);
601
602        buffer.copy_from_slice(b"Small");
603        assert_eq!(buffer.capacity(), 8192);
604        assert_eq!(buffer.initialized(), 5);
605    }
606
607    #[tokio::test]
608    async fn test_as_mut_slice_capacity() {
609        let pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 5);
610        let mut buffer = pool.acquire().await;
611
612        // as_mut_slice should return full capacity
613        let slice = buffer.as_mut_slice();
614        assert_eq!(slice.len(), 1024);
615    }
616
617    #[tokio::test]
618    async fn test_empty_slice_copy() {
619        let pool = BufferPool::new(BufferSize::try_new(512).unwrap(), 5);
620        let mut buffer = pool.acquire().await;
621
622        // Copying empty slice should work
623        buffer.copy_from_slice(&[]);
624        assert_eq!(buffer.initialized(), 0);
625        assert_eq!(&*buffer, b"");
626    }
627
628    #[tokio::test]
629    async fn test_buffer_reuse_preserves_capacity() {
630        let pool = BufferPool::new(BufferSize::try_new(2048).unwrap(), 5);
631
632        {
633            let mut buffer = pool.acquire().await;
634            buffer.copy_from_slice(b"test");
635            assert_eq!(buffer.capacity(), 2048);
636        } // Drop returns to pool
637
638        let buffer2 = pool.acquire().await;
639        // Should have same capacity when reused
640        assert_eq!(buffer2.capacity(), 2048);
641    }
642
643    #[test]
644    fn test_buffer_size_alignment() {
645        // Test that create_aligned_buffer aligns to page boundaries
646        let buffer = BufferPool::create_aligned_buffer(1000);
647        // Should be aligned to 4096
648        assert_eq!(buffer.len(), 1000);
649        assert_eq!(buffer.capacity() % 4096, 0);
650
651        let buffer2 = BufferPool::create_aligned_buffer(8192);
652        assert_eq!(buffer2.len(), 8192);
653        assert_eq!(buffer2.capacity() % 4096, 0);
654    }
655}