santh-bufpool 0.1.0

Typed buffer recycling with fixed size classes and lock-free checkout/return
Documentation
use std::ptr::NonNull;
use std::slice;

use crossbeam_queue::ArrayQueue;

/// Lock-free pool for a single buffer capacity.
#[derive(Debug)]
pub(crate) struct SizeClassPool {
    pub(crate) capacity: usize,
    pub(crate) queue: Option<ArrayQueue<BufferAllocation>>,
    pub(crate) numa_node: Option<u32>,
}

impl SizeClassPool {
    pub(crate) fn pooled(capacity: usize, count: usize, numa_node: Option<u32>) -> Self {
        let queue = if count > 0 {
            let queue = ArrayQueue::new(count);
            for _ in 0..count {
                // If push fails, the queue is full, which shouldn't happen here
                let _ = queue.push(allocate_buffer(capacity, numa_node));
            }
            Some(queue)
        } else {
            None
        };

        Self {
            capacity,
            queue,
            numa_node,
        }
    }

    pub(crate) fn pop(&self) -> Option<BufferAllocation> {
        self.queue
            .as_ref()
            .and_then(crossbeam_queue::ArrayQueue::pop)
    }

    pub(crate) fn allocate_fallback(&self) -> BufferAllocation {
        allocate_buffer(self.capacity, self.numa_node)
    }

    pub(crate) fn recycle_or_free(&self, allocation: BufferAllocation) {
        if let Some(queue) = &self.queue {
            if let Err(returned_alloc) = queue.push(allocation) {
                free_buffer(returned_alloc.ptr, self.capacity);
            }
        } else {
            free_buffer(allocation.ptr, self.capacity);
        }
    }
}

impl Drop for SizeClassPool {
    fn drop(&mut self) {
        if let Some(queue) = &self.queue {
            while let Some(allocation) = queue.pop() {
                free_buffer(allocation.ptr, self.capacity);
            }
        }
    }
}

/// Owned handle to a pooled buffer allocation.
pub(crate) struct BufferAllocation {
    pub(crate) ptr: NonNull<u8>,
}

// SAFETY: `BufferAllocation` is just ownership of an allocated byte region.
// Moving it across threads does not create aliasing; synchronization is handled
// by `ArrayQueue` and mutable access only occurs after exclusive checkout.
unsafe impl Send for BufferAllocation {}
// SAFETY: shared references do not permit mutation of the pointed-to memory.
// Actual writes only happen after a buffer is popped or checked out exclusively.
unsafe impl Sync for BufferAllocation {}

fn allocate_buffer(capacity: usize, numa_node: Option<u32>) -> BufferAllocation {
    #[cfg(feature = "numa")]
    let ptr = if let Some(node) = numa_node {
        match kernelkit::numa::alloc_on_node::<u8>(capacity, node) {
            Ok(mut values) => leak_vec(&mut values),
            Err(_) => allocate_heap_buffer(capacity),
        }
    } else {
        allocate_heap_buffer(capacity)
    };

    #[cfg(not(feature = "numa"))]
    let ptr = {
        let _ = numa_node;
        allocate_heap_buffer(capacity)
    };

    BufferAllocation { ptr }
}

fn allocate_heap_buffer(capacity: usize) -> NonNull<u8> {
    let mut values = Vec::with_capacity(capacity);
    // SAFETY: the vector has `capacity` reserved bytes and `u8` does not
    // require initialization beyond zeroing. `write_bytes` initializes the
    // allocation before `set_len` exposes it as initialized elements.
    unsafe {
        std::ptr::write_bytes(values.as_mut_ptr(), 0, capacity);
        values.set_len(capacity);
    }
    leak_vec(&mut values)
}

fn leak_vec(values: &mut Vec<u8>) -> NonNull<u8> {
    if values.capacity() == 0 {
        return NonNull::dangling();
    }
    let ptr = values.as_mut_ptr();
    std::mem::forget(std::mem::take(values));
    // SAFETY: the pointer comes from a leaked Vec with >0 capacity
    unsafe { NonNull::new_unchecked(ptr) }
}

#[allow(clippy::same_length_and_capacity)]
fn free_buffer(ptr: NonNull<u8>, capacity: usize) {
    if capacity == 0 {
        return;
    }
    // SAFETY: `ptr` came from a leaked Vec of `capacity` length, exactly as
    // required by `from_raw_parts_mut`. We take ownership and drop it.
    unsafe {
        let _ = Vec::from_raw_parts(ptr.as_ptr(), capacity, capacity);
    }
}

/// Zero `capacity` bytes starting at `ptr`.
pub(crate) fn zero_buffer(ptr: NonNull<u8>, capacity: usize) {
    if capacity == 0 {
        return;
    }

    // SAFETY: `ptr` references a valid allocation of exactly `capacity` bytes
    // owned by the pool at this point, so it is safe to overwrite in place.
    unsafe {
        slice::from_raw_parts_mut(ptr.as_ptr(), capacity).fill(0);
    }
}