use super::*;
use std::sync::{Mutex, atomic::{AtomicU64, Ordering}};
pub struct CacheBuffer {
buffer_type: BufferType,
cache: Option<*const SingleLruPageCache>,
node_indices: Vec<NodeIndex>,
data_buffer: Vec<u8>,
data_slice: Option<&'static [u8]>,
hit_type: CacheHitType,
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum BufferType {
SinglePage,
MultiPage,
Copied,
Empty,
}
impl CacheBuffer {
pub fn new() -> Self {
Self {
buffer_type: BufferType::Empty,
cache: None,
node_indices: Vec::new(),
data_buffer: Vec::new(),
data_slice: None,
hit_type: CacheHitType::Hit,
}
}
pub(crate) fn set_node(&mut self, cache: &SingleLruPageCache, node_idx: NodeIndex) {
self.cleanup();
self.buffer_type = BufferType::SinglePage;
self.cache = Some(cache as *const _);
self.node_indices.push(node_idx);
self.hit_type = CacheHitType::Hit;
}
pub(crate) fn setup_multi_page(
&mut self,
cache: &SingleLruPageCache,
node_indices: Vec<NodeIndex>,
offset: u64,
length: usize
) {
self.cleanup();
self.buffer_type = BufferType::MultiPage;
self.cache = Some(cache as *const _);
self.data_buffer.clear();
self.data_buffer.reserve(length);
let _start_page = (offset / PAGE_SIZE as u64) as PageId;
let page_offset = (offset % PAGE_SIZE as u64) as usize;
let _remaining = length;
let _current_offset = page_offset;
self.data_buffer.resize(length, 0);
self.node_indices = node_indices;
self.hit_type = CacheHitType::Mix;
let data_ptr = self.data_buffer.as_ptr();
self.data_slice = Some(unsafe { std::slice::from_raw_parts(data_ptr, length) });
}
pub fn copy_from_slice(&mut self, data: &[u8]) {
self.cleanup();
self.buffer_type = BufferType::Copied;
self.data_buffer.clear();
self.data_buffer.extend_from_slice(data);
let data_ptr = self.data_buffer.as_ptr();
self.data_slice = Some(unsafe { std::slice::from_raw_parts(data_ptr, data.len()) });
self.hit_type = CacheHitType::Hit;
}
pub fn extend_from_slice(&mut self, data: &[u8]) {
if data.is_empty() {
return;
}
if !matches!(self.buffer_type, BufferType::Copied) {
let existing_data = self.data().to_vec();
self.cleanup();
self.buffer_type = BufferType::Copied;
self.data_buffer = existing_data;
}
self.data_buffer.extend_from_slice(data);
let data_ptr = self.data_buffer.as_ptr();
self.data_slice = Some(unsafe { std::slice::from_raw_parts(data_ptr, self.data_buffer.len()) });
}
pub fn data(&self) -> &[u8] {
match self.buffer_type {
BufferType::SinglePage => {
&self.data_buffer
}
BufferType::MultiPage | BufferType::Copied => {
self.data_slice.unwrap_or(&[])
}
BufferType::Empty => &[],
}
}
#[inline]
pub fn len(&self) -> usize {
self.data().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn hit_type(&self) -> CacheHitType {
self.hit_type
}
pub fn has_data(&self) -> bool {
!matches!(self.buffer_type, BufferType::Empty)
}
pub fn clear(&mut self) {
self.cleanup();
self.buffer_type = BufferType::Empty;
self.data_buffer.clear();
self.data_slice = None;
}
fn cleanup(&mut self) {
self.cache = None;
self.node_indices.clear();
}
pub fn from_data(data: Vec<u8>) -> Self {
let len = data.len();
let mut buffer = Self::new();
buffer.buffer_type = BufferType::Copied;
buffer.data_buffer = data;
let data_ptr = buffer.data_buffer.as_ptr();
buffer.data_slice = Some(unsafe { std::slice::from_raw_parts(data_ptr, len) });
buffer
}
pub fn reserve(&mut self, capacity: usize) {
self.data_buffer.reserve(capacity);
}
#[inline]
pub fn capacity(&self) -> usize {
self.data_buffer.capacity()
}
}
impl Default for CacheBuffer {
fn default() -> Self {
Self::new()
}
}
impl Drop for CacheBuffer {
fn drop(&mut self) {
self.cleanup();
}
}
unsafe impl Send for CacheBuffer {}
pub struct BufferPool {
available: Mutex<Vec<CacheBuffer>>,
max_size: usize,
allocations: AtomicU64,
reuses: AtomicU64,
}
impl BufferPool {
pub fn new(max_size: usize) -> Self {
Self {
available: Mutex::new(Vec::new()),
max_size,
allocations: AtomicU64::new(0),
reuses: AtomicU64::new(0),
}
}
pub fn get(&self) -> CacheBuffer {
if let Ok(mut buffers) = self.available.lock() {
if let Some(mut buffer) = buffers.pop() {
buffer.clear();
self.reuses.fetch_add(1, Ordering::Relaxed);
return buffer;
}
}
self.allocations.fetch_add(1, Ordering::Relaxed);
CacheBuffer::new()
}
pub fn put(&self, buffer: CacheBuffer) {
if let Ok(mut buffers) = self.available.lock() {
if buffers.len() < self.max_size {
buffers.push(buffer);
}
}
}
pub fn stats(&self) -> BufferPoolStats {
let available_count = self.available.lock()
.map(|buffers| buffers.len())
.unwrap_or(0);
BufferPoolStats {
allocations: self.allocations.load(Ordering::Relaxed),
reuses: self.reuses.load(Ordering::Relaxed),
available_count,
max_size: self.max_size,
}
}
}
#[derive(Debug, Clone)]
pub struct BufferPoolStats {
pub allocations: u64,
pub reuses: u64,
pub available_count: usize,
pub max_size: usize,
}
impl BufferPoolStats {
pub fn reuse_ratio(&self) -> f64 {
if self.allocations + self.reuses == 0 {
0.0
} else {
self.reuses as f64 / (self.allocations + self.reuses) as f64
}
}
pub fn pool_utilization(&self) -> f64 {
self.available_count as f64 / self.max_size as f64
}
}