Skip to main content

reifydb_core/value/column/buffer/
pool.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::HashMap;
5
6use reifydb_runtime::sync::mutex::Mutex;
7use reifydb_type::value::r#type::Type;
8
9use crate::value::column::buffer::ColumnBuffer;
10
11/// Fixed per-type retention cap. Not configurable for now; bounds the
12/// worst-case pool size at 26 (concrete types) * 64 = 1664 buffers.
13const CAP_PER_TYPE: usize = 64;
14
15pub struct ColumnBufferPool {
16	inner: Mutex<HashMap<Type, Vec<ColumnBuffer>>>,
17}
18
19impl Default for ColumnBufferPool {
20	fn default() -> Self {
21		Self::new()
22	}
23}
24
25impl ColumnBufferPool {
26	pub fn new() -> Self {
27		Self {
28			inner: Mutex::new(HashMap::new()),
29		}
30	}
31
32	/// Pop a buffer for `target` whose `capacity() >= min_capacity`, preferring
33	/// the smallest qualifying buffer (best-fit). Falls back to allocating
34	/// fresh via `ColumnBuffer::with_capacity` if `target` is not poolable
35	/// or no qualifying buffer is in the bucket.
36	pub fn acquire(&self, target: &Type, min_capacity: usize) -> ColumnBuffer {
37		if is_poolable(target) {
38			let mut pool = self.inner.lock();
39			if let Some(bucket) = pool.get_mut(target) {
40				let mut best_idx: Option<usize> = None;
41				let mut best_cap: usize = usize::MAX;
42				for (i, buf) in bucket.iter().enumerate() {
43					let cap = buf.capacity();
44					if cap >= min_capacity && cap < best_cap {
45						best_cap = cap;
46						best_idx = Some(i);
47					}
48				}
49				if let Some(i) = best_idx {
50					return bucket.swap_remove(i);
51				}
52			}
53		}
54		ColumnBuffer::with_capacity(target.clone(), min_capacity)
55	}
56
57	/// Clear `buffer` and return it to the pool indexed by its type. Drops
58	/// the buffer if its type is not poolable, or if the bucket is at
59	/// `CAP_PER_TYPE`.
60	pub fn release(&self, mut buffer: ColumnBuffer) {
61		let buffer_type = buffer.get_type();
62		if !is_poolable(&buffer_type) {
63			return;
64		}
65		buffer.clear();
66		let mut pool = self.inner.lock();
67		let bucket = pool.entry(buffer_type).or_default();
68		if bucket.len() < CAP_PER_TYPE {
69			bucket.push(buffer);
70		}
71	}
72
73	/// Total number of buffers currently held across all type buckets.
74	/// Primarily for tests.
75	pub fn len(&self) -> usize {
76		self.inner.lock().values().map(|v| v.len()).sum()
77	}
78
79	/// `true` if every bucket is empty. Primarily for tests.
80	pub fn is_empty(&self) -> bool {
81		self.len() == 0
82	}
83}
84
85/// True for the 26 concrete `Type` variants the pool buckets. Polymorphic
86/// or variable-shape types (`Option(_)`, `Any`, `List(_)`, `Record(_)`,
87/// `Tuple(_)`) bypass the pool - `acquire` allocates fresh, `release` drops.
88fn is_poolable(t: &Type) -> bool {
89	matches!(
90		t,
91		Type::Boolean
92			| Type::Float4 | Type::Float8
93			| Type::Int1 | Type::Int2
94			| Type::Int4 | Type::Int8
95			| Type::Int16 | Type::Uint1
96			| Type::Uint2 | Type::Uint4
97			| Type::Uint8 | Type::Uint16
98			| Type::Utf8 | Type::Date
99			| Type::DateTime | Type::Time
100			| Type::Duration | Type::IdentityId
101			| Type::Uuid4 | Type::Uuid7
102			| Type::Blob | Type::Int
103			| Type::Uint | Type::Decimal
104			| Type::DictionaryId
105	)
106}
107
108#[cfg(test)]
109mod tests {
110	use reifydb_type::value::{Value, r#type::Type};
111
112	use super::{ColumnBufferPool, is_poolable};
113	use crate::value::column::buffer::ColumnBuffer;
114
115	#[test]
116	fn acquire_from_empty_pool_allocates_fresh() {
117		let pool = ColumnBufferPool::new();
118		let buf = pool.acquire(&Type::Int8, 4);
119		assert_eq!(buf.get_type(), Type::Int8);
120		assert!(buf.capacity() >= 4);
121		assert!(pool.is_empty());
122	}
123
124	#[test]
125	fn release_then_acquire_reuses_same_allocation() {
126		let pool = ColumnBufferPool::new();
127		let mut buf = ColumnBuffer::with_capacity(Type::Int8, 16);
128		// Push then clear to mark the buffer non-empty before release
129		// (so the capacity assertion below checks reuse, not freshness).
130		for i in 0..8i64 {
131			buf.push_value(Value::Int8(i));
132		}
133		let original_capacity = buf.capacity();
134		pool.release(buf);
135		assert_eq!(pool.len(), 1);
136
137		let reused = pool.acquire(&Type::Int8, 1);
138		assert_eq!(reused.get_type(), Type::Int8);
139		assert_eq!(reused.capacity(), original_capacity);
140		assert_eq!(reused.len(), 0);
141		assert!(pool.is_empty());
142	}
143
144	#[test]
145	fn best_fit_prefers_smallest_qualifying_buffer() {
146		let pool = ColumnBufferPool::new();
147		pool.release(ColumnBuffer::with_capacity(Type::Int8, 4));
148		pool.release(ColumnBuffer::with_capacity(Type::Int8, 32));
149		pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
150		pool.release(ColumnBuffer::with_capacity(Type::Int8, 64));
151		assert_eq!(pool.len(), 4);
152
153		// Need >= 10. The smallest qualifying buffer in the pool
154		// is the 16-capacity one. Aligned-capacity quirks may round
155		// up a bit but it should pick the buffer closest to 10.
156		let pick = pool.acquire(&Type::Int8, 10);
157		assert!(pick.capacity() >= 10);
158		assert!(pick.capacity() < 32);
159		assert_eq!(pool.len(), 3);
160	}
161
162	#[test]
163	fn release_at_cap_drops_overflow() {
164		let pool = ColumnBufferPool::new();
165		// CAP_PER_TYPE is 64; release 65 to overflow by one.
166		for _ in 0..65 {
167			pool.release(ColumnBuffer::with_capacity(Type::Int8, 1));
168		}
169		assert_eq!(pool.len(), 64);
170	}
171
172	#[test]
173	fn buffers_do_not_cross_pollute_across_types() {
174		let pool = ColumnBufferPool::new();
175		pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
176		pool.release(ColumnBuffer::with_capacity(Type::Utf8, 16));
177		assert_eq!(pool.len(), 2);
178
179		let int8 = pool.acquire(&Type::Int8, 1);
180		assert_eq!(int8.get_type(), Type::Int8);
181		assert_eq!(pool.len(), 1);
182
183		let utf8 = pool.acquire(&Type::Utf8, 1);
184		assert_eq!(utf8.get_type(), Type::Utf8);
185		assert!(pool.is_empty());
186	}
187
188	#[test]
189	fn non_poolable_types_bypass_pool() {
190		let pool = ColumnBufferPool::new();
191		// Option(Int8) is not poolable.
192		let opt_ty = Type::Option(Box::new(Type::Int8));
193		let opt_buf = ColumnBuffer::with_capacity(opt_ty.clone(), 8);
194		pool.release(opt_buf);
195		assert!(pool.is_empty(), "Option-wrapped buffers must not enter the pool");
196
197		// Subsequent acquire allocates fresh; the pool stays empty.
198		let acquired = pool.acquire(&opt_ty, 4);
199		assert!(acquired.capacity() >= 4);
200		assert!(pool.is_empty());
201	}
202
203	#[test]
204	fn is_poolable_matrix() {
205		assert!(is_poolable(&Type::Boolean));
206		assert!(is_poolable(&Type::Float4));
207		assert!(is_poolable(&Type::Float8));
208		assert!(is_poolable(&Type::Int1));
209		assert!(is_poolable(&Type::Int2));
210		assert!(is_poolable(&Type::Int4));
211		assert!(is_poolable(&Type::Int8));
212		assert!(is_poolable(&Type::Int16));
213		assert!(is_poolable(&Type::Uint1));
214		assert!(is_poolable(&Type::Uint2));
215		assert!(is_poolable(&Type::Uint4));
216		assert!(is_poolable(&Type::Uint8));
217		assert!(is_poolable(&Type::Uint16));
218		assert!(is_poolable(&Type::Utf8));
219		assert!(is_poolable(&Type::Date));
220		assert!(is_poolable(&Type::DateTime));
221		assert!(is_poolable(&Type::Time));
222		assert!(is_poolable(&Type::Duration));
223		assert!(is_poolable(&Type::IdentityId));
224		assert!(is_poolable(&Type::Uuid4));
225		assert!(is_poolable(&Type::Uuid7));
226		assert!(is_poolable(&Type::Blob));
227		assert!(is_poolable(&Type::Int));
228		assert!(is_poolable(&Type::Uint));
229		assert!(is_poolable(&Type::Decimal));
230		assert!(is_poolable(&Type::DictionaryId));
231
232		assert!(!is_poolable(&Type::Option(Box::new(Type::Int8))));
233		assert!(!is_poolable(&Type::Any));
234		assert!(!is_poolable(&Type::List(Box::new(Type::Int8))));
235		assert!(!is_poolable(&Type::Record(vec![])));
236		assert!(!is_poolable(&Type::Tuple(vec![])));
237	}
238}