use std::collections::HashMap;
use reifydb_runtime::sync::mutex::Mutex;
use reifydb_type::value::r#type::Type;
use crate::value::column::buffer::ColumnBuffer;
const CAP_PER_TYPE: usize = 64;
pub struct ColumnBufferPool {
inner: Mutex<HashMap<Type, Vec<ColumnBuffer>>>,
}
impl Default for ColumnBufferPool {
fn default() -> Self {
Self::new()
}
}
impl ColumnBufferPool {
pub fn new() -> Self {
Self {
inner: Mutex::new(HashMap::new()),
}
}
pub fn acquire(&self, target: &Type, min_capacity: usize) -> ColumnBuffer {
if is_poolable(target) {
let mut pool = self.inner.lock();
if let Some(bucket) = pool.get_mut(target) {
let mut best_idx: Option<usize> = None;
let mut best_cap: usize = usize::MAX;
for (i, buf) in bucket.iter().enumerate() {
let cap = buf.capacity();
if cap >= min_capacity && cap < best_cap {
best_cap = cap;
best_idx = Some(i);
}
}
if let Some(i) = best_idx {
return bucket.swap_remove(i);
}
}
}
ColumnBuffer::with_capacity(target.clone(), min_capacity)
}
pub fn release(&self, mut buffer: ColumnBuffer) {
let buffer_type = buffer.get_type();
if !is_poolable(&buffer_type) {
return;
}
buffer.clear();
let mut pool = self.inner.lock();
let bucket = pool.entry(buffer_type).or_default();
if bucket.len() < CAP_PER_TYPE {
bucket.push(buffer);
}
}
pub fn len(&self) -> usize {
self.inner.lock().values().map(|v| v.len()).sum()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
fn is_poolable(t: &Type) -> bool {
matches!(
t,
Type::Boolean
| Type::Float4 | Type::Float8
| Type::Int1 | Type::Int2
| Type::Int4 | Type::Int8
| Type::Int16 | Type::Uint1
| Type::Uint2 | Type::Uint4
| Type::Uint8 | Type::Uint16
| Type::Utf8 | Type::Date
| Type::DateTime | Type::Time
| Type::Duration | Type::IdentityId
| Type::Uuid4 | Type::Uuid7
| Type::Blob | Type::Int
| Type::Uint | Type::Decimal
| Type::DictionaryId
)
}
#[cfg(test)]
mod tests {
use reifydb_type::value::{Value, r#type::Type};
use super::{ColumnBufferPool, is_poolable};
use crate::value::column::buffer::ColumnBuffer;
#[test]
fn acquire_from_empty_pool_allocates_fresh() {
let pool = ColumnBufferPool::new();
let buf = pool.acquire(&Type::Int8, 4);
assert_eq!(buf.get_type(), Type::Int8);
assert!(buf.capacity() >= 4);
assert!(pool.is_empty());
}
#[test]
fn release_then_acquire_reuses_same_allocation() {
let pool = ColumnBufferPool::new();
let mut buf = ColumnBuffer::with_capacity(Type::Int8, 16);
for i in 0..8i64 {
buf.push_value(Value::Int8(i));
}
let original_capacity = buf.capacity();
pool.release(buf);
assert_eq!(pool.len(), 1);
let reused = pool.acquire(&Type::Int8, 1);
assert_eq!(reused.get_type(), Type::Int8);
assert_eq!(reused.capacity(), original_capacity);
assert_eq!(reused.len(), 0);
assert!(pool.is_empty());
}
#[test]
fn best_fit_prefers_smallest_qualifying_buffer() {
let pool = ColumnBufferPool::new();
pool.release(ColumnBuffer::with_capacity(Type::Int8, 4));
pool.release(ColumnBuffer::with_capacity(Type::Int8, 32));
pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
pool.release(ColumnBuffer::with_capacity(Type::Int8, 64));
assert_eq!(pool.len(), 4);
let pick = pool.acquire(&Type::Int8, 10);
assert!(pick.capacity() >= 10);
assert!(pick.capacity() < 32);
assert_eq!(pool.len(), 3);
}
#[test]
fn release_at_cap_drops_overflow() {
let pool = ColumnBufferPool::new();
for _ in 0..65 {
pool.release(ColumnBuffer::with_capacity(Type::Int8, 1));
}
assert_eq!(pool.len(), 64);
}
#[test]
fn buffers_do_not_cross_pollute_across_types() {
let pool = ColumnBufferPool::new();
pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
pool.release(ColumnBuffer::with_capacity(Type::Utf8, 16));
assert_eq!(pool.len(), 2);
let int8 = pool.acquire(&Type::Int8, 1);
assert_eq!(int8.get_type(), Type::Int8);
assert_eq!(pool.len(), 1);
let utf8 = pool.acquire(&Type::Utf8, 1);
assert_eq!(utf8.get_type(), Type::Utf8);
assert!(pool.is_empty());
}
#[test]
fn non_poolable_types_bypass_pool() {
let pool = ColumnBufferPool::new();
let opt_ty = Type::Option(Box::new(Type::Int8));
let opt_buf = ColumnBuffer::with_capacity(opt_ty.clone(), 8);
pool.release(opt_buf);
assert!(pool.is_empty(), "Option-wrapped buffers must not enter the pool");
let acquired = pool.acquire(&opt_ty, 4);
assert!(acquired.capacity() >= 4);
assert!(pool.is_empty());
}
#[test]
fn is_poolable_matrix() {
assert!(is_poolable(&Type::Boolean));
assert!(is_poolable(&Type::Float4));
assert!(is_poolable(&Type::Float8));
assert!(is_poolable(&Type::Int1));
assert!(is_poolable(&Type::Int2));
assert!(is_poolable(&Type::Int4));
assert!(is_poolable(&Type::Int8));
assert!(is_poolable(&Type::Int16));
assert!(is_poolable(&Type::Uint1));
assert!(is_poolable(&Type::Uint2));
assert!(is_poolable(&Type::Uint4));
assert!(is_poolable(&Type::Uint8));
assert!(is_poolable(&Type::Uint16));
assert!(is_poolable(&Type::Utf8));
assert!(is_poolable(&Type::Date));
assert!(is_poolable(&Type::DateTime));
assert!(is_poolable(&Type::Time));
assert!(is_poolable(&Type::Duration));
assert!(is_poolable(&Type::IdentityId));
assert!(is_poolable(&Type::Uuid4));
assert!(is_poolable(&Type::Uuid7));
assert!(is_poolable(&Type::Blob));
assert!(is_poolable(&Type::Int));
assert!(is_poolable(&Type::Uint));
assert!(is_poolable(&Type::Decimal));
assert!(is_poolable(&Type::DictionaryId));
assert!(!is_poolable(&Type::Option(Box::new(Type::Int8))));
assert!(!is_poolable(&Type::Any));
assert!(!is_poolable(&Type::List(Box::new(Type::Int8))));
assert!(!is_poolable(&Type::Record(vec![])));
assert!(!is_poolable(&Type::Tuple(vec![])));
}
}