reifydb_core/value/column/buffer/
pool.rs1use std::collections::HashMap;
5
6use reifydb_runtime::sync::mutex::Mutex;
7use reifydb_type::value::r#type::Type;
8
9use crate::value::column::buffer::ColumnBuffer;
10
11const CAP_PER_TYPE: usize = 64;
14
15pub struct ColumnBufferPool {
16 inner: Mutex<HashMap<Type, Vec<ColumnBuffer>>>,
17}
18
19impl Default for ColumnBufferPool {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl ColumnBufferPool {
26 pub fn new() -> Self {
27 Self {
28 inner: Mutex::new(HashMap::new()),
29 }
30 }
31
32 pub fn acquire(&self, target: &Type, min_capacity: usize) -> ColumnBuffer {
37 if is_poolable(target) {
38 let mut pool = self.inner.lock();
39 if let Some(bucket) = pool.get_mut(target) {
40 let mut best_idx: Option<usize> = None;
41 let mut best_cap: usize = usize::MAX;
42 for (i, buf) in bucket.iter().enumerate() {
43 let cap = buf.capacity();
44 if cap >= min_capacity && cap < best_cap {
45 best_cap = cap;
46 best_idx = Some(i);
47 }
48 }
49 if let Some(i) = best_idx {
50 return bucket.swap_remove(i);
51 }
52 }
53 }
54 ColumnBuffer::with_capacity(target.clone(), min_capacity)
55 }
56
57 pub fn release(&self, mut buffer: ColumnBuffer) {
61 let buffer_type = buffer.get_type();
62 if !is_poolable(&buffer_type) {
63 return;
64 }
65 buffer.clear();
66 let mut pool = self.inner.lock();
67 let bucket = pool.entry(buffer_type).or_default();
68 if bucket.len() < CAP_PER_TYPE {
69 bucket.push(buffer);
70 }
71 }
72
73 pub fn len(&self) -> usize {
76 self.inner.lock().values().map(|v| v.len()).sum()
77 }
78
79 pub fn is_empty(&self) -> bool {
81 self.len() == 0
82 }
83}
84
85fn is_poolable(t: &Type) -> bool {
89 matches!(
90 t,
91 Type::Boolean
92 | Type::Float4 | Type::Float8
93 | Type::Int1 | Type::Int2
94 | Type::Int4 | Type::Int8
95 | Type::Int16 | Type::Uint1
96 | Type::Uint2 | Type::Uint4
97 | Type::Uint8 | Type::Uint16
98 | Type::Utf8 | Type::Date
99 | Type::DateTime | Type::Time
100 | Type::Duration | Type::IdentityId
101 | Type::Uuid4 | Type::Uuid7
102 | Type::Blob | Type::Int
103 | Type::Uint | Type::Decimal
104 | Type::DictionaryId
105 )
106}
107
108#[cfg(test)]
109mod tests {
110 use reifydb_type::value::{Value, r#type::Type};
111
112 use super::{ColumnBufferPool, is_poolable};
113 use crate::value::column::buffer::ColumnBuffer;
114
115 #[test]
116 fn acquire_from_empty_pool_allocates_fresh() {
117 let pool = ColumnBufferPool::new();
118 let buf = pool.acquire(&Type::Int8, 4);
119 assert_eq!(buf.get_type(), Type::Int8);
120 assert!(buf.capacity() >= 4);
121 assert!(pool.is_empty());
122 }
123
124 #[test]
125 fn release_then_acquire_reuses_same_allocation() {
126 let pool = ColumnBufferPool::new();
127 let mut buf = ColumnBuffer::with_capacity(Type::Int8, 16);
128 for i in 0..8i64 {
131 buf.push_value(Value::Int8(i));
132 }
133 let original_capacity = buf.capacity();
134 pool.release(buf);
135 assert_eq!(pool.len(), 1);
136
137 let reused = pool.acquire(&Type::Int8, 1);
138 assert_eq!(reused.get_type(), Type::Int8);
139 assert_eq!(reused.capacity(), original_capacity);
140 assert_eq!(reused.len(), 0);
141 assert!(pool.is_empty());
142 }
143
144 #[test]
145 fn best_fit_prefers_smallest_qualifying_buffer() {
146 let pool = ColumnBufferPool::new();
147 pool.release(ColumnBuffer::with_capacity(Type::Int8, 4));
148 pool.release(ColumnBuffer::with_capacity(Type::Int8, 32));
149 pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
150 pool.release(ColumnBuffer::with_capacity(Type::Int8, 64));
151 assert_eq!(pool.len(), 4);
152
153 let pick = pool.acquire(&Type::Int8, 10);
157 assert!(pick.capacity() >= 10);
158 assert!(pick.capacity() < 32);
159 assert_eq!(pool.len(), 3);
160 }
161
162 #[test]
163 fn release_at_cap_drops_overflow() {
164 let pool = ColumnBufferPool::new();
165 for _ in 0..65 {
167 pool.release(ColumnBuffer::with_capacity(Type::Int8, 1));
168 }
169 assert_eq!(pool.len(), 64);
170 }
171
172 #[test]
173 fn buffers_do_not_cross_pollute_across_types() {
174 let pool = ColumnBufferPool::new();
175 pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
176 pool.release(ColumnBuffer::with_capacity(Type::Utf8, 16));
177 assert_eq!(pool.len(), 2);
178
179 let int8 = pool.acquire(&Type::Int8, 1);
180 assert_eq!(int8.get_type(), Type::Int8);
181 assert_eq!(pool.len(), 1);
182
183 let utf8 = pool.acquire(&Type::Utf8, 1);
184 assert_eq!(utf8.get_type(), Type::Utf8);
185 assert!(pool.is_empty());
186 }
187
188 #[test]
189 fn non_poolable_types_bypass_pool() {
190 let pool = ColumnBufferPool::new();
191 let opt_ty = Type::Option(Box::new(Type::Int8));
193 let opt_buf = ColumnBuffer::with_capacity(opt_ty.clone(), 8);
194 pool.release(opt_buf);
195 assert!(pool.is_empty(), "Option-wrapped buffers must not enter the pool");
196
197 let acquired = pool.acquire(&opt_ty, 4);
199 assert!(acquired.capacity() >= 4);
200 assert!(pool.is_empty());
201 }
202
203 #[test]
204 fn is_poolable_matrix() {
205 assert!(is_poolable(&Type::Boolean));
206 assert!(is_poolable(&Type::Float4));
207 assert!(is_poolable(&Type::Float8));
208 assert!(is_poolable(&Type::Int1));
209 assert!(is_poolable(&Type::Int2));
210 assert!(is_poolable(&Type::Int4));
211 assert!(is_poolable(&Type::Int8));
212 assert!(is_poolable(&Type::Int16));
213 assert!(is_poolable(&Type::Uint1));
214 assert!(is_poolable(&Type::Uint2));
215 assert!(is_poolable(&Type::Uint4));
216 assert!(is_poolable(&Type::Uint8));
217 assert!(is_poolable(&Type::Uint16));
218 assert!(is_poolable(&Type::Utf8));
219 assert!(is_poolable(&Type::Date));
220 assert!(is_poolable(&Type::DateTime));
221 assert!(is_poolable(&Type::Time));
222 assert!(is_poolable(&Type::Duration));
223 assert!(is_poolable(&Type::IdentityId));
224 assert!(is_poolable(&Type::Uuid4));
225 assert!(is_poolable(&Type::Uuid7));
226 assert!(is_poolable(&Type::Blob));
227 assert!(is_poolable(&Type::Int));
228 assert!(is_poolable(&Type::Uint));
229 assert!(is_poolable(&Type::Decimal));
230 assert!(is_poolable(&Type::DictionaryId));
231
232 assert!(!is_poolable(&Type::Option(Box::new(Type::Int8))));
233 assert!(!is_poolable(&Type::Any));
234 assert!(!is_poolable(&Type::List(Box::new(Type::Int8))));
235 assert!(!is_poolable(&Type::Record(vec![])));
236 assert!(!is_poolable(&Type::Tuple(vec![])));
237 }
238}