use super::*;
use std::sync::{
Mutex,
atomic::{AtomicU64, Ordering},
};
pub struct CacheBuffer {
buffer_type: BufferType,
cache: Option<*const SingleLruPageCache>,
node_indices: Vec<NodeIndex>,
data_buffer: Vec<u8>,
data_slice: Option<&'static [u8]>,
hit_type: CacheHitType,
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum BufferType {
#[allow(dead_code)]
SinglePage,
#[allow(dead_code)]
MultiPage,
Copied,
Empty,
}
impl CacheBuffer {
pub fn new() -> Self {
Self {
buffer_type: BufferType::Empty,
cache: None,
node_indices: Vec::new(),
data_buffer: Vec::new(),
data_slice: None,
hit_type: CacheHitType::Hit,
}
}
#[allow(dead_code)]
pub(crate) fn set_node(&mut self, cache: &SingleLruPageCache, node_idx: NodeIndex) {
self.cleanup();
self.buffer_type = BufferType::SinglePage;
self.cache = Some(cache as *const _);
self.node_indices.push(node_idx);
self.hit_type = CacheHitType::Hit;
}
#[allow(dead_code)]
pub(crate) fn setup_multi_page(
&mut self,
cache: &SingleLruPageCache,
node_indices: Vec<NodeIndex>,
offset: u64,
length: usize,
) {
self.cleanup();
self.buffer_type = BufferType::MultiPage;
self.cache = Some(cache as *const _);
self.data_buffer.clear();
self.data_buffer.reserve(length);
let _start_page = (offset / PAGE_SIZE as u64) as PageId;
let page_offset = (offset % PAGE_SIZE as u64) as usize;
let _remaining = length;
let _current_offset = page_offset;
self.data_buffer.resize(length, 0);
self.node_indices = node_indices;
self.hit_type = CacheHitType::Mix;
let data_ptr = self.data_buffer.as_ptr();
self.data_slice = Some(unsafe { std::slice::from_raw_parts(data_ptr, length) });
}
pub fn copy_from_slice(&mut self, data: &[u8]) {
self.cleanup();
self.buffer_type = BufferType::Copied;
self.data_buffer.clear();
self.data_buffer.extend_from_slice(data);
let data_ptr = self.data_buffer.as_ptr();
self.data_slice = Some(unsafe { std::slice::from_raw_parts(data_ptr, data.len()) });
self.hit_type = CacheHitType::Hit;
}
pub fn extend_from_slice(&mut self, data: &[u8]) {
if data.is_empty() {
return;
}
if !matches!(self.buffer_type, BufferType::Copied) {
let existing_data = self.data().to_vec();
self.cleanup();
self.buffer_type = BufferType::Copied;
self.data_buffer = existing_data;
}
self.data_buffer.extend_from_slice(data);
let data_ptr = self.data_buffer.as_ptr();
self.data_slice =
Some(unsafe { std::slice::from_raw_parts(data_ptr, self.data_buffer.len()) });
}
pub fn data(&self) -> &[u8] {
match self.buffer_type {
BufferType::SinglePage => {
&self.data_buffer
}
BufferType::MultiPage | BufferType::Copied => self.data_slice.unwrap_or(&[]),
BufferType::Empty => &[],
}
}
#[inline]
pub fn len(&self) -> usize {
self.data().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn hit_type(&self) -> CacheHitType {
self.hit_type
}
pub fn has_data(&self) -> bool {
!matches!(self.buffer_type, BufferType::Empty)
}
pub fn clear(&mut self) {
self.cleanup();
self.buffer_type = BufferType::Empty;
self.data_buffer.clear();
self.data_slice = None;
}
fn cleanup(&mut self) {
self.cache = None;
self.node_indices.clear();
}
pub fn from_data(data: Vec<u8>) -> Self {
let len = data.len();
let mut buffer = Self::new();
buffer.buffer_type = BufferType::Copied;
buffer.data_buffer = data;
let data_ptr = buffer.data_buffer.as_ptr();
buffer.data_slice = Some(unsafe { std::slice::from_raw_parts(data_ptr, len) });
buffer
}
pub fn reserve(&mut self, capacity: usize) {
self.data_buffer.reserve(capacity);
}
#[inline]
pub fn capacity(&self) -> usize {
self.data_buffer.capacity()
}
}
impl Default for CacheBuffer {
fn default() -> Self {
Self::new()
}
}
impl Drop for CacheBuffer {
fn drop(&mut self) {
self.cleanup();
}
}
unsafe impl Send for CacheBuffer {}
pub struct BufferPool {
available: Mutex<Vec<CacheBuffer>>,
max_size: usize,
allocations: AtomicU64,
reuses: AtomicU64,
}
impl BufferPool {
pub fn new(max_size: usize) -> Self {
Self {
available: Mutex::new(Vec::new()),
max_size,
allocations: AtomicU64::new(0),
reuses: AtomicU64::new(0),
}
}
pub fn get(&self) -> CacheBuffer {
if let Ok(mut buffers) = self.available.lock()
&& let Some(mut buffer) = buffers.pop()
{
buffer.clear();
self.reuses.fetch_add(1, Ordering::Relaxed);
return buffer;
}
self.allocations.fetch_add(1, Ordering::Relaxed);
CacheBuffer::new()
}
pub fn put(&self, buffer: CacheBuffer) {
if let Ok(mut buffers) = self.available.lock()
&& buffers.len() < self.max_size
{
buffers.push(buffer);
}
}
pub fn stats(&self) -> BufferPoolStats {
let available_count = self
.available
.lock()
.map(|buffers| buffers.len())
.unwrap_or(0);
BufferPoolStats {
allocations: self.allocations.load(Ordering::Relaxed),
reuses: self.reuses.load(Ordering::Relaxed),
available_count,
max_size: self.max_size,
}
}
}
#[derive(Debug, Clone)]
pub struct BufferPoolStats {
pub allocations: u64,
pub reuses: u64,
pub available_count: usize,
pub max_size: usize,
}
impl BufferPoolStats {
pub fn reuse_ratio(&self) -> f64 {
if self.allocations + self.reuses == 0 {
0.0
} else {
self.reuses as f64 / (self.allocations + self.reuses) as f64
}
}
pub fn pool_utilization(&self) -> f64 {
self.available_count as f64 / self.max_size as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buffer_pool_basic() {
let pool = BufferPool::new(4);
assert_eq!(pool.stats().max_size, 4);
let buf = pool.get();
assert_eq!(buf.len(), 0);
let stats = pool.stats();
assert_eq!(stats.allocations, 1);
pool.put(buf);
let stats = pool.stats();
assert_eq!(stats.available_count, 1);
}
#[test]
fn test_buffer_new_is_empty() {
let buffer = CacheBuffer::new();
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
let empty: &[u8] = &[];
assert_eq!(buffer.data(), empty);
}
#[test]
fn test_buffer_copy_from_slice() {
let mut buffer = CacheBuffer::new();
let test_data = b"Hello, Cache Buffer!";
buffer.copy_from_slice(test_data);
assert_eq!(buffer.len(), test_data.len());
assert!(!buffer.is_empty());
assert_eq!(buffer.data(), test_data);
assert_eq!(buffer.hit_type(), CacheHitType::Hit);
}
#[test]
fn test_buffer_extend_multiple() {
let mut buffer = CacheBuffer::new();
let data1 = b"First ";
let data2 = b"Second";
buffer.copy_from_slice(data1);
buffer.extend_from_slice(data2);
let expected = b"First Second";
assert_eq!(buffer.data(), expected);
assert_eq!(buffer.len(), expected.len());
}
#[test]
fn test_buffer_clear_resets() {
let mut buffer = CacheBuffer::new();
buffer.copy_from_slice(b"Some data");
assert!(!buffer.is_empty());
buffer.clear();
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
let empty: &[u8] = &[];
assert_eq!(buffer.data(), empty);
}
#[test]
fn test_buffer_from_data_vec() {
let test_data = vec![1, 2, 3, 4, 5];
let buffer = CacheBuffer::from_data(test_data.clone());
assert_eq!(buffer.data(), &test_data[..]);
assert_eq!(buffer.len(), test_data.len());
assert!(!buffer.is_empty());
}
#[test]
fn test_buffer_reserve_capacity() {
let mut buffer = CacheBuffer::new();
assert!(buffer.capacity() < 1024);
buffer.reserve(1024);
assert!(buffer.capacity() >= 1024);
}
#[test]
fn test_buffer_send_across_thread() {
use std::sync::mpsc;
use std::thread;
let mut buffer = CacheBuffer::new();
let test_data = b"Thread-safe data";
buffer.copy_from_slice(test_data);
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(buffer).expect("Failed to send buffer");
});
let received_buffer = rx.recv().expect("Failed to receive buffer");
assert_eq!(received_buffer.data(), test_data);
}
#[test]
fn test_buffer_pool_reuse_stats() {
let pool = BufferPool::new(10);
let mut buf1 = pool.get();
buf1.copy_from_slice(b"test");
let stats1 = pool.stats();
assert_eq!(stats1.allocations, 1);
assert_eq!(stats1.reuses, 0);
pool.put(buf1);
let buf2 = pool.get();
let stats2 = pool.stats();
assert_eq!(stats2.allocations, 1);
assert_eq!(stats2.reuses, 1);
assert!(buf2.is_empty()); assert!(stats2.reuse_ratio() > 0.0);
}
#[test]
fn test_buffer_pool_max_size() {
let pool = BufferPool::new(2);
let buf1 = pool.get();
let buf2 = pool.get();
let buf3 = pool.get();
pool.put(buf1);
pool.put(buf2);
pool.put(buf3);
let stats = pool.stats();
assert_eq!(stats.available_count, 2); assert_eq!(stats.max_size, 2);
assert!(stats.pool_utilization() == 1.0); }
#[test]
fn test_buffer_hit_type_tracking() {
let mut buffer = CacheBuffer::new();
assert_eq!(buffer.hit_type(), CacheHitType::Hit);
buffer.copy_from_slice(b"test");
assert_eq!(buffer.hit_type(), CacheHitType::Hit);
let config = crate::cache::PageCacheConfig::balanced();
let cache = crate::cache::SingleLruPageCache::new(config).unwrap();
buffer.setup_multi_page(&cache, vec![1, 2], 0, 1024);
assert_eq!(buffer.hit_type(), CacheHitType::Mix);
}
}