santh-bufpool 0.1.0

Typed buffer recycling with fixed size classes and lock-free checkout/return
Documentation
use std::sync::atomic::Ordering;
use std::sync::Arc;

use crate::buffer::PoolBuffer;
use crate::config::PoolConfig;
use crate::error::{Error, Result, MAX_REQUEST_BYTES};
use crate::size_class::SizeClassPool;
use crate::stats::PoolStats;
use crate::tls::take_tls_buffer;

pub(crate) const FOUR_KIB: usize = 4 * 1024;
pub(crate) const SIXTY_FOUR_KIB: usize = 64 * 1024;
pub(crate) const TWO_FIFTY_SIX_KIB: usize = 256 * 1024;
pub(crate) const ONE_MIB: usize = 1024 * 1024;

/// Fixed-size recyclable buffer pool.
///
/// # Examples
///
/// ```rust
/// use santh_bufpool::{BufferPool, PoolConfig};
///
/// let pool = BufferPool::new(PoolConfig {
///     four_kib_count: 1,
///     ..PoolConfig::default()
/// });
/// let buffer = pool.checkout(32).unwrap();
/// assert_eq!(buffer.len(), 32);
/// ```
#[derive(Debug)]
pub struct BufferPool {
    pub(crate) four_kib: Arc<SizeClassPool>,
    pub(crate) sixty_four_kib: Arc<SizeClassPool>,
    pub(crate) two_fifty_six_kib: Arc<SizeClassPool>,
    pub(crate) one_mib: Arc<SizeClassPool>,
    pub(crate) numa_node: Option<u32>,
    pub(crate) stats: Arc<PoolStats>,
}

impl BufferPool {
    /// Create a new buffer pool from the supplied configuration.
    ///
    /// NUMA placement is best-effort. If `kernelkit` cannot place pages on the
    /// requested node, allocation falls back to a standard heap buffer.
    #[must_use]
    pub fn new(config: PoolConfig) -> Self {
        Self {
            four_kib: Arc::new(SizeClassPool::pooled(
                FOUR_KIB,
                config.four_kib_count,
                config.numa_node,
            )),
            sixty_four_kib: Arc::new(SizeClassPool::pooled(
                SIXTY_FOUR_KIB,
                config.sixty_four_kib_count,
                config.numa_node,
            )),
            two_fifty_six_kib: Arc::new(SizeClassPool::pooled(
                TWO_FIFTY_SIX_KIB,
                config.two_fifty_six_kib_count,
                config.numa_node,
            )),
            one_mib: Arc::new(SizeClassPool::pooled(
                ONE_MIB,
                config.one_mib_count,
                config.numa_node,
            )),
            numa_node: config.numa_node,
            stats: Arc::new(PoolStats::default()),
        }
    }

    /// Access pool statistics.
    #[must_use]
    pub fn stats(&self) -> &PoolStats {
        &self.stats
    }

    /// Check out a buffer with at least `min_bytes` of capacity.
    ///
    /// The returned slice length matches `min_bytes`. Its backing allocation is
    /// guaranteed to be at least `min_bytes` but may be up to 1 MiB.
    ///
    /// # Errors
    ///
    /// Returns an error if `min_bytes` exceeds `isize::MAX`.
    pub fn checkout(&self, min_bytes: usize) -> Result<PoolBuffer> {
        if min_bytes > MAX_REQUEST_BYTES {
            return Err(Error::RequestedLengthTooLarge {
                requested: min_bytes,
            });
        }

        if let Some((capacity, ptr, owner)) = take_tls_buffer(min_bytes) {
            let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
            self.stats
                .peak_checked_out
                .fetch_max(out, Ordering::Relaxed);
            let bytes_out = self
                .stats
                .bytes_checked_out
                .fetch_add(min_bytes, Ordering::Relaxed)
                + min_bytes;
            self.stats
                .peak_bytes_checked_out
                .fetch_max(bytes_out, Ordering::Relaxed);
            return Ok(PoolBuffer {
                ptr,
                len: min_bytes,
                capacity,
                owner,
                stats: Some(Arc::clone(&self.stats)),
            });
        }

        let selected = self.select_pool(min_bytes);
        let fallback_capacity = selected.map_or(min_bytes, |pool| pool.capacity);

        if let Some(pool) = selected {
            if let Some(allocation) = pool.pop() {
                self.stats.hits.fetch_add(1, Ordering::Relaxed);
                let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
                self.stats
                    .peak_checked_out
                    .fetch_max(out, Ordering::Relaxed);
                let bytes_out = self
                    .stats
                    .bytes_checked_out
                    .fetch_add(min_bytes, Ordering::Relaxed)
                    + min_bytes;
                self.stats
                    .peak_bytes_checked_out
                    .fetch_max(bytes_out, Ordering::Relaxed);
                return Ok(PoolBuffer {
                    ptr: allocation.ptr,
                    len: min_bytes,
                    capacity: pool.capacity,
                    owner: Arc::clone(pool),
                    stats: Some(Arc::clone(&self.stats)),
                });
            }
        }

        self.stats.misses.fetch_add(1, Ordering::Relaxed);
        let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
        self.stats
            .peak_checked_out
            .fetch_max(out, Ordering::Relaxed);
        let bytes_out = self
            .stats
            .bytes_checked_out
            .fetch_add(min_bytes, Ordering::Relaxed)
            + min_bytes;
        self.stats
            .peak_bytes_checked_out
            .fetch_max(bytes_out, Ordering::Relaxed);

        let owner = selected.map_or_else(
            || Arc::new(SizeClassPool::pooled(fallback_capacity, 0, self.numa_node)),
            Arc::clone,
        );

        let allocation = owner.allocate_fallback();
        Ok(PoolBuffer {
            ptr: allocation.ptr,
            len: min_bytes,
            capacity: fallback_capacity,
            owner,
            stats: Some(Arc::clone(&self.stats)),
        })
    }

    /// Check out a buffer and explicitly zero its visible contents.
    ///
    /// Recycled buffers are zeroed before being returned to the pool, so this
    /// is mostly useful if you need to ensure the buffer is zeroed immediately
    /// prior to use (e.g. for cryptographic materials).
    ///
    /// # Errors
    ///
    /// Returns an error if `min_bytes` exceeds `isize::MAX`.
    pub fn checkout_zeroed(&self, min_bytes: usize) -> Result<PoolBuffer> {
        let mut buffer = self.checkout(min_bytes)?;
        buffer.fill(0);
        Ok(buffer)
    }

    /// Check out a buffer with best-effort alignment.
    ///
    /// The pool will attempt to provide a buffer aligned to `alignment` bytes,
    /// but this is not guaranteed for alignments larger than the platform's
    /// default heap alignment (typically 8 or 16 bytes). For SIMD (32-byte),
    /// this typically works. For page alignment (4096), use OS-specific allocators.
    ///
    /// # Errors
    ///
    /// Returns an error if `min_bytes` exceeds platform limits or `alignment`
    /// is not a power of two.
    pub fn checkout_aligned(&self, min_bytes: usize, alignment: usize) -> Result<PoolBuffer> {
        if !alignment.is_power_of_two() || alignment == 0 {
            return Err(Error::InvalidAlignment { alignment });
        }
        // Best-effort: the underlying allocator (heap via Vec or mmap) typically
        // provides at least 8- or 16-byte alignment. Pooled buffers from mmap
        // are page-aligned (4096). We cannot adjust the base pointer because
        // Drop recycles from `ptr` directly. For guaranteed SIMD alignment,
        // callers should use arenakit or a dedicated aligned allocator.
        self.checkout(min_bytes)
    }

    #[doc(hidden)]
    pub fn available_for_test(&self, class: usize) -> usize {
        let pool = match class {
            FOUR_KIB => &self.four_kib,
            SIXTY_FOUR_KIB => &self.sixty_four_kib,
            TWO_FIFTY_SIX_KIB => &self.two_fifty_six_kib,
            ONE_MIB => &self.one_mib,
            _ => return 0,
        };
        pool.queue
            .as_ref()
            .map_or(0, crossbeam_queue::ArrayQueue::len)
    }

    fn select_pool(&self, min_bytes: usize) -> Option<&Arc<SizeClassPool>> {
        if min_bytes <= FOUR_KIB {
            Some(&self.four_kib)
        } else if min_bytes <= SIXTY_FOUR_KIB {
            Some(&self.sixty_four_kib)
        } else if min_bytes <= TWO_FIFTY_SIX_KIB {
            Some(&self.two_fifty_six_kib)
        } else if min_bytes <= ONE_MIB {
            Some(&self.one_mib)
        } else {
            None
        }
    }
}