reifydb_core/value/column/buffer/
pool.rs1use std::collections::HashMap;
5
6use reifydb_runtime::sync::mutex::Mutex;
7use reifydb_type::value::r#type::Type;
8
9use crate::value::column::buffer::ColumnBuffer;
10
11const CAP_PER_TYPE: usize = 64;
12
13pub struct ColumnBufferPool {
14 inner: Mutex<HashMap<Type, Vec<ColumnBuffer>>>,
15}
16
17impl Default for ColumnBufferPool {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl ColumnBufferPool {
24 pub fn new() -> Self {
25 Self {
26 inner: Mutex::new(HashMap::new()),
27 }
28 }
29
30 pub fn acquire(&self, target: &Type, min_capacity: usize) -> ColumnBuffer {
31 if is_poolable(target) {
32 let mut pool = self.inner.lock();
33 if let Some(bucket) = pool.get_mut(target) {
34 let mut best_idx: Option<usize> = None;
35 let mut best_cap: usize = usize::MAX;
36 for (i, buf) in bucket.iter().enumerate() {
37 let cap = buf.capacity();
38 if cap >= min_capacity && cap < best_cap {
39 best_cap = cap;
40 best_idx = Some(i);
41 }
42 }
43 if let Some(i) = best_idx {
44 return bucket.swap_remove(i);
45 }
46 }
47 }
48 ColumnBuffer::with_capacity(target.clone(), min_capacity)
49 }
50
51 pub fn release(&self, mut buffer: ColumnBuffer) {
52 let buffer_type = buffer.get_type();
53 if !is_poolable(&buffer_type) {
54 return;
55 }
56 buffer.clear();
57 let mut pool = self.inner.lock();
58 let bucket = pool.entry(buffer_type).or_default();
59 if bucket.len() < CAP_PER_TYPE {
60 bucket.push(buffer);
61 }
62 }
63
64 pub fn len(&self) -> usize {
65 self.inner.lock().values().map(|v| v.len()).sum()
66 }
67
68 pub fn is_empty(&self) -> bool {
69 self.len() == 0
70 }
71}
72
73fn is_poolable(t: &Type) -> bool {
74 matches!(
75 t,
76 Type::Boolean
77 | Type::Float4 | Type::Float8
78 | Type::Int1 | Type::Int2
79 | Type::Int4 | Type::Int8
80 | Type::Int16 | Type::Uint1
81 | Type::Uint2 | Type::Uint4
82 | Type::Uint8 | Type::Uint16
83 | Type::Utf8 | Type::Date
84 | Type::DateTime | Type::Time
85 | Type::Duration | Type::IdentityId
86 | Type::Uuid4 | Type::Uuid7
87 | Type::Blob | Type::Int
88 | Type::Uint | Type::Decimal
89 | Type::DictionaryId
90 )
91}
92
93#[cfg(test)]
94mod tests {
95 use reifydb_type::value::{Value, r#type::Type};
96
97 use super::{ColumnBufferPool, is_poolable};
98 use crate::value::column::buffer::ColumnBuffer;
99
100 #[test]
101 fn acquire_from_empty_pool_allocates_fresh() {
102 let pool = ColumnBufferPool::new();
103 let buf = pool.acquire(&Type::Int8, 4);
104 assert_eq!(buf.get_type(), Type::Int8);
105 assert!(buf.capacity() >= 4);
106 assert!(pool.is_empty());
107 }
108
109 #[test]
110 fn release_then_acquire_reuses_same_allocation() {
111 let pool = ColumnBufferPool::new();
112 let mut buf = ColumnBuffer::with_capacity(Type::Int8, 16);
113 for i in 0..8i64 {
116 buf.push_value(Value::Int8(i));
117 }
118 let original_capacity = buf.capacity();
119 pool.release(buf);
120 assert_eq!(pool.len(), 1);
121
122 let reused = pool.acquire(&Type::Int8, 1);
123 assert_eq!(reused.get_type(), Type::Int8);
124 assert_eq!(reused.capacity(), original_capacity);
125 assert_eq!(reused.len(), 0);
126 assert!(pool.is_empty());
127 }
128
129 #[test]
130 fn best_fit_prefers_smallest_qualifying_buffer() {
131 let pool = ColumnBufferPool::new();
132 pool.release(ColumnBuffer::with_capacity(Type::Int8, 4));
133 pool.release(ColumnBuffer::with_capacity(Type::Int8, 32));
134 pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
135 pool.release(ColumnBuffer::with_capacity(Type::Int8, 64));
136 assert_eq!(pool.len(), 4);
137
138 let pick = pool.acquire(&Type::Int8, 10);
142 assert!(pick.capacity() >= 10);
143 assert!(pick.capacity() < 32);
144 assert_eq!(pool.len(), 3);
145 }
146
147 #[test]
148 fn release_at_cap_drops_overflow() {
149 let pool = ColumnBufferPool::new();
150 for _ in 0..65 {
152 pool.release(ColumnBuffer::with_capacity(Type::Int8, 1));
153 }
154 assert_eq!(pool.len(), 64);
155 }
156
157 #[test]
158 fn buffers_do_not_cross_pollute_across_types() {
159 let pool = ColumnBufferPool::new();
160 pool.release(ColumnBuffer::with_capacity(Type::Int8, 16));
161 pool.release(ColumnBuffer::with_capacity(Type::Utf8, 16));
162 assert_eq!(pool.len(), 2);
163
164 let int8 = pool.acquire(&Type::Int8, 1);
165 assert_eq!(int8.get_type(), Type::Int8);
166 assert_eq!(pool.len(), 1);
167
168 let utf8 = pool.acquire(&Type::Utf8, 1);
169 assert_eq!(utf8.get_type(), Type::Utf8);
170 assert!(pool.is_empty());
171 }
172
173 #[test]
174 fn non_poolable_types_bypass_pool() {
175 let pool = ColumnBufferPool::new();
176 let opt_ty = Type::Option(Box::new(Type::Int8));
178 let opt_buf = ColumnBuffer::with_capacity(opt_ty.clone(), 8);
179 pool.release(opt_buf);
180 assert!(pool.is_empty(), "Option-wrapped buffers must not enter the pool");
181
182 let acquired = pool.acquire(&opt_ty, 4);
184 assert!(acquired.capacity() >= 4);
185 assert!(pool.is_empty());
186 }
187
188 #[test]
189 fn is_poolable_matrix() {
190 assert!(is_poolable(&Type::Boolean));
191 assert!(is_poolable(&Type::Float4));
192 assert!(is_poolable(&Type::Float8));
193 assert!(is_poolable(&Type::Int1));
194 assert!(is_poolable(&Type::Int2));
195 assert!(is_poolable(&Type::Int4));
196 assert!(is_poolable(&Type::Int8));
197 assert!(is_poolable(&Type::Int16));
198 assert!(is_poolable(&Type::Uint1));
199 assert!(is_poolable(&Type::Uint2));
200 assert!(is_poolable(&Type::Uint4));
201 assert!(is_poolable(&Type::Uint8));
202 assert!(is_poolable(&Type::Uint16));
203 assert!(is_poolable(&Type::Utf8));
204 assert!(is_poolable(&Type::Date));
205 assert!(is_poolable(&Type::DateTime));
206 assert!(is_poolable(&Type::Time));
207 assert!(is_poolable(&Type::Duration));
208 assert!(is_poolable(&Type::IdentityId));
209 assert!(is_poolable(&Type::Uuid4));
210 assert!(is_poolable(&Type::Uuid7));
211 assert!(is_poolable(&Type::Blob));
212 assert!(is_poolable(&Type::Int));
213 assert!(is_poolable(&Type::Uint));
214 assert!(is_poolable(&Type::Decimal));
215 assert!(is_poolable(&Type::DictionaryId));
216
217 assert!(!is_poolable(&Type::Option(Box::new(Type::Int8))));
218 assert!(!is_poolable(&Type::Any));
219 assert!(!is_poolable(&Type::List(Box::new(Type::Int8))));
220 assert!(!is_poolable(&Type::Record(vec![])));
221 assert!(!is_poolable(&Type::Tuple(vec![])));
222 }
223}