Skip to main content

reifydb_core/value/column/buffer/
pool.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::HashMap;
5
6use reifydb_runtime::sync::mutex::Mutex;
7use reifydb_type::value::r#type::Type;
8
9use crate::value::column::buffer::ColumnBuffer;
10
11const CAP_PER_TYPE: usize = 64;
12
13pub struct ColumnBufferPool {
14	inner: Mutex<HashMap<Type, Vec<ColumnBuffer>>>,
15}
16
17impl Default for ColumnBufferPool {
18	fn default() -> Self {
19		Self::new()
20	}
21}
22
23impl ColumnBufferPool {
24	pub fn new() -> Self {
25		Self {
26			inner: Mutex::new(HashMap::new()),
27		}
28	}
29
30	pub fn acquire(&self, target: &Type, min_capacity: usize) -> ColumnBuffer {
31		if is_poolable(target) {
32			let mut pool = self.inner.lock();
33			if let Some(bucket) = pool.get_mut(target) {
34				let mut best_idx: Option<usize> = None;
35				let mut best_cap: usize = usize::MAX;
36				for (i, buf) in bucket.iter().enumerate() {
37					let cap = buf.capacity();
38					if cap >= min_capacity && cap < best_cap {
39						best_cap = cap;
40						best_idx = Some(i);
41					}
42				}
43				if let Some(i) = best_idx {
44					return bucket.swap_remove(i);
45				}
46			}
47		}
48		ColumnBuffer::with_capacity(target.clone(), min_capacity)
49	}
50
51	pub fn release(&self, mut buffer: ColumnBuffer) {
52		let buffer_type = buffer.get_type();
53		if !is_poolable(&buffer_type) {
54			return;
55		}
56		buffer.clear();
57		let mut pool = self.inner.lock();
58		let bucket = pool.entry(buffer_type).or_default();
59		if bucket.len() < CAP_PER_TYPE {
60			bucket.push(buffer);
61		}
62	}
63
64	pub fn len(&self) -> usize {
65		self.inner.lock().values().map(|v| v.len()).sum()
66	}
67
68	pub fn is_empty(&self) -> bool {
69		self.len() == 0
70	}
71}
72
73fn is_poolable(t: &Type) -> bool {
74	matches!(
75		t,
76		Type::Boolean
77			| Type::Float4 | Type::Float8
78			| Type::Int1 | Type::Int2
79			| Type::Int4 | Type::Int8
80			| Type::Int16 | Type::Uint1
81			| Type::Uint2 | Type::Uint4
82			| Type::Uint8 | Type::Uint16
83			| Type::Utf8 | Type::Date
84			| Type::DateTime | Type::Time
85			| Type::Duration | Type::IdentityId
86			| Type::Uuid4 | Type::Uuid7
87			| Type::Blob | Type::Int
88			| Type::Uint | Type::Decimal
89			| Type::DictionaryId
90	)
91}
92
93#[cfg(test)]
94mod tests {
95	use reifydb_type::value::{Value, r#type::Type};
96
97	use super::{ColumnBufferPool, is_poolable};
98	use crate::value::column::buffer::ColumnBuffer;
99
100	#[test]
101	fn acquire_from_empty_pool_allocates_fresh() {
102		let pool = ColumnBufferPool::new();
103		let buf = pool.acquire(&Type::Int8, 4);
104		assert_eq!(buf.get_type(), Type::Int8);
105		assert!(buf.capacity() >= 4);
106		assert!(pool.is_empty());
107	}
108
109	#[test]
110	fn release_then_acquire_reuses_same_allocation() {
111		let pool = ColumnBufferPool::new();
112		let mut buf = ColumnBuffer::with_capacity(Type::Int8, 16);
113		// Push then clear to mark the buffer non-empty before release
114		// (so the capacity assertion below checks reuse, not freshness).
115		for i in 0..8i64 {
116			buf.push_value(Value::Int8(i));
117		}
118		let original_capacity = buf.capacity();
119		pool.release(buf);
120		assert_eq!(pool.len(), 1);
121
122		let reused = pool.acquire(&Type::Int8, 1);
123		assert_eq!(reused.get_type(), Type::Int8);
124		assert_eq!(reused.capacity(), original_capacity);
125		assert_eq!(reused.len(), 0);
126		assert!(pool.is_empty());
127	}
128
129	#[test]
130	fn best_fit_prefers_smallest_qualifying_buffer() {
131		let pool = ColumnBufferPool::new();
132		pool.release(ColumnBuffer::with_capacity(Type::Int8, 4));
133		pool.release(ColumnBuffer::with_capacity(Type::Int8, 32));
134		pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
135		pool.release(ColumnBuffer::with_capacity(Type::Int8, 64));
136		assert_eq!(pool.len(), 4);
137
138		// Need >= 10. The smallest qualifying buffer in the pool
139		// is the 16-capacity one. Aligned-capacity quirks may round
140		// up a bit but it should pick the buffer closest to 10.
141		let pick = pool.acquire(&Type::Int8, 10);
142		assert!(pick.capacity() >= 10);
143		assert!(pick.capacity() < 32);
144		assert_eq!(pool.len(), 3);
145	}
146
147	#[test]
148	fn release_at_cap_drops_overflow() {
149		let pool = ColumnBufferPool::new();
150		// CAP_PER_TYPE is 64; release 65 to overflow by one.
151		for _ in 0..65 {
152			pool.release(ColumnBuffer::with_capacity(Type::Int8, 1));
153		}
154		assert_eq!(pool.len(), 64);
155	}
156
157	#[test]
158	fn buffers_do_not_cross_pollute_across_types() {
159		let pool = ColumnBufferPool::new();
160		pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
161		pool.release(ColumnBuffer::with_capacity(Type::Utf8, 16));
162		assert_eq!(pool.len(), 2);
163
164		let int8 = pool.acquire(&Type::Int8, 1);
165		assert_eq!(int8.get_type(), Type::Int8);
166		assert_eq!(pool.len(), 1);
167
168		let utf8 = pool.acquire(&Type::Utf8, 1);
169		assert_eq!(utf8.get_type(), Type::Utf8);
170		assert!(pool.is_empty());
171	}
172
173	#[test]
174	fn non_poolable_types_bypass_pool() {
175		let pool = ColumnBufferPool::new();
176		// Option(Int8) is not poolable.
177		let opt_ty = Type::Option(Box::new(Type::Int8));
178		let opt_buf = ColumnBuffer::with_capacity(opt_ty.clone(), 8);
179		pool.release(opt_buf);
180		assert!(pool.is_empty(), "Option-wrapped buffers must not enter the pool");
181
182		// Subsequent acquire allocates fresh; the pool stays empty.
183		let acquired = pool.acquire(&opt_ty, 4);
184		assert!(acquired.capacity() >= 4);
185		assert!(pool.is_empty());
186	}
187
188	#[test]
189	fn is_poolable_matrix() {
190		assert!(is_poolable(&Type::Boolean));
191		assert!(is_poolable(&Type::Float4));
192		assert!(is_poolable(&Type::Float8));
193		assert!(is_poolable(&Type::Int1));
194		assert!(is_poolable(&Type::Int2));
195		assert!(is_poolable(&Type::Int4));
196		assert!(is_poolable(&Type::Int8));
197		assert!(is_poolable(&Type::Int16));
198		assert!(is_poolable(&Type::Uint1));
199		assert!(is_poolable(&Type::Uint2));
200		assert!(is_poolable(&Type::Uint4));
201		assert!(is_poolable(&Type::Uint8));
202		assert!(is_poolable(&Type::Uint16));
203		assert!(is_poolable(&Type::Utf8));
204		assert!(is_poolable(&Type::Date));
205		assert!(is_poolable(&Type::DateTime));
206		assert!(is_poolable(&Type::Time));
207		assert!(is_poolable(&Type::Duration));
208		assert!(is_poolable(&Type::IdentityId));
209		assert!(is_poolable(&Type::Uuid4));
210		assert!(is_poolable(&Type::Uuid7));
211		assert!(is_poolable(&Type::Blob));
212		assert!(is_poolable(&Type::Int));
213		assert!(is_poolable(&Type::Uint));
214		assert!(is_poolable(&Type::Decimal));
215		assert!(is_poolable(&Type::DictionaryId));
216
217		assert!(!is_poolable(&Type::Option(Box::new(Type::Int8))));
218		assert!(!is_poolable(&Type::Any));
219		assert!(!is_poolable(&Type::List(Box::new(Type::Int8))));
220		assert!(!is_poolable(&Type::Record(vec![])));
221		assert!(!is_poolable(&Type::Tuple(vec![])));
222	}
223}