vibesql_storage/
query_buffer_pool.rs1use std::cell::RefCell;
7
8use vibesql_types::SqlValue;
9
10use crate::Row;
11
12const DEFAULT_ROW_CAPACITY: usize = 128;
14
15const DEFAULT_VALUE_CAPACITY: usize = 16;
17
18const MAX_POOLED_BUFFERS: usize = 32;
20
21thread_local! {
22 static ROW_POOL: RefCell<Vec<Vec<Row>>> = const { RefCell::new(Vec::new()) };
23 static VALUE_POOL: RefCell<Vec<Vec<SqlValue>>> = const { RefCell::new(Vec::new()) };
24}
25
26#[derive(Debug, Clone, Copy, Default)]
31pub struct QueryBufferPool;
32
33impl QueryBufferPool {
34 pub fn new() -> Self {
36 Self
37 }
38
39 pub fn get_row_buffer(&self, min_capacity: usize) -> Vec<Row> {
44 ROW_POOL.with(|pool| {
45 let mut pool = pool.borrow_mut();
46
47 if let Some(pos) = pool.iter().position(|buf| buf.capacity() >= min_capacity) {
49 let mut buffer = pool.swap_remove(pos);
50 buffer.clear();
51 buffer
52 } else {
53 Vec::with_capacity(min_capacity.max(DEFAULT_ROW_CAPACITY))
55 }
56 })
57 }
58
59 pub fn return_row_buffer(&self, mut buffer: Vec<Row>) {
64 buffer.clear();
65
66 ROW_POOL.with(|pool| {
67 let mut pool = pool.borrow_mut();
68 if pool.len() < MAX_POOLED_BUFFERS {
69 pool.push(buffer);
70 }
71 });
73 }
74
75 pub fn get_value_buffer(&self, min_capacity: usize) -> Vec<SqlValue> {
80 VALUE_POOL.with(|pool| {
81 let mut pool = pool.borrow_mut();
82
83 if let Some(pos) = pool.iter().position(|buf| buf.capacity() >= min_capacity) {
85 let mut buffer = pool.swap_remove(pos);
86 buffer.clear();
87 buffer
88 } else {
89 Vec::with_capacity(min_capacity.max(DEFAULT_VALUE_CAPACITY))
91 }
92 })
93 }
94
95 pub fn return_value_buffer(&self, mut buffer: Vec<SqlValue>) {
100 buffer.clear();
101
102 VALUE_POOL.with(|pool| {
103 let mut pool = pool.borrow_mut();
104 if pool.len() < MAX_POOLED_BUFFERS {
105 pool.push(buffer);
106 }
107 });
109 }
110
111 pub fn stats(&self) -> QueryBufferPoolStats {
115 let row_buffers_pooled = ROW_POOL.with(|pool| pool.borrow().len());
116 let value_buffers_pooled = VALUE_POOL.with(|pool| pool.borrow().len());
117
118 QueryBufferPoolStats { row_buffers_pooled, value_buffers_pooled }
119 }
120
121 pub fn clear_thread_local_pools() {
138 ROW_POOL.with(|pool| {
139 pool.borrow_mut().clear();
140 });
141 VALUE_POOL.with(|pool| {
142 pool.borrow_mut().clear();
143 });
144 }
145}
146
147#[derive(Debug, Clone, Copy)]
149pub struct QueryBufferPoolStats {
150 pub row_buffers_pooled: usize,
151 pub value_buffers_pooled: usize,
152}
153
154pub struct RowBufferGuard {
156 buffer: Option<Vec<Row>>,
157 pool: QueryBufferPool,
158}
159
160impl RowBufferGuard {
161 pub fn new(buffer: Vec<Row>, pool: QueryBufferPool) -> Self {
163 Self { buffer: Some(buffer), pool }
164 }
165
166 pub fn take(mut self) -> Vec<Row> {
168 self.buffer.take().expect("buffer already taken")
169 }
170
171 #[allow(clippy::should_implement_trait)]
173 pub fn as_ref(&self) -> &Vec<Row> {
174 self.buffer.as_ref().expect("buffer already taken")
175 }
176
177 #[allow(clippy::should_implement_trait)]
179 pub fn as_mut(&mut self) -> &mut Vec<Row> {
180 self.buffer.as_mut().expect("buffer already taken")
181 }
182}
183
184impl Drop for RowBufferGuard {
185 fn drop(&mut self) {
186 if let Some(buffer) = self.buffer.take() {
187 self.pool.return_row_buffer(buffer);
188 }
189 }
190}
191
192pub struct ValueBufferGuard {
194 buffer: Option<Vec<SqlValue>>,
195 pool: QueryBufferPool,
196}
197
198impl ValueBufferGuard {
199 pub fn new(buffer: Vec<SqlValue>, pool: QueryBufferPool) -> Self {
201 Self { buffer: Some(buffer), pool }
202 }
203
204 pub fn take(mut self) -> Vec<SqlValue> {
206 self.buffer.take().expect("buffer already taken")
207 }
208
209 #[allow(clippy::should_implement_trait)]
211 pub fn as_ref(&self) -> &Vec<SqlValue> {
212 self.buffer.as_ref().expect("buffer already taken")
213 }
214
215 #[allow(clippy::should_implement_trait)]
217 pub fn as_mut(&mut self) -> &mut Vec<SqlValue> {
218 self.buffer.as_mut().expect("buffer already taken")
219 }
220}
221
222impl Drop for ValueBufferGuard {
223 fn drop(&mut self) {
224 if let Some(buffer) = self.buffer.take() {
225 self.pool.return_value_buffer(buffer);
226 }
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_row_buffer_pool_reuse() {
236 let pool = QueryBufferPool::new();
237
238 let buffer = pool.get_row_buffer(10);
240 assert!(buffer.capacity() >= 10);
241
242 pool.return_row_buffer(buffer);
244
245 let buffer2 = pool.get_row_buffer(10);
247 assert!(buffer2.capacity() >= 10);
248 }
249
250 #[test]
251 fn test_value_buffer_pool_reuse() {
252 let pool = QueryBufferPool::new();
253
254 let buffer = pool.get_value_buffer(5);
256 assert!(buffer.capacity() >= 5);
257
258 pool.return_value_buffer(buffer);
260
261 let buffer2 = pool.get_value_buffer(5);
263 assert!(buffer2.capacity() >= 5);
264 }
265
266 #[test]
267 fn test_buffer_guard_auto_return() {
268 let pool = QueryBufferPool::new();
269
270 {
271 let buffer = pool.get_row_buffer(10);
272 let _guard = RowBufferGuard::new(buffer, pool);
273 }
275
276 let stats = pool.stats();
277 assert_eq!(stats.row_buffers_pooled, 1);
278 }
279
280 #[test]
281 fn test_pool_max_size() {
282 let pool = QueryBufferPool::new();
283
284 for _ in 0..(MAX_POOLED_BUFFERS + 10) {
286 let buffer = Vec::with_capacity(10);
287 pool.return_row_buffer(buffer);
288 }
289
290 let stats = pool.stats();
291 assert_eq!(stats.row_buffers_pooled, MAX_POOLED_BUFFERS);
292 }
293
294 #[test]
295 fn test_concurrent_access_thread_safety() {
296 use std::{sync::Arc, thread};
297
298 let pool = Arc::new(QueryBufferPool::new());
299 let mut handles = vec![];
300
301 for _ in 0..4 {
303 let pool = Arc::clone(&pool);
304 let handle = thread::spawn(move || {
305 for _ in 0..100 {
306 let buffer = pool.get_row_buffer(10);
307 assert!(buffer.capacity() >= 10);
308 pool.return_row_buffer(buffer);
309 }
310 });
311 handles.push(handle);
312 }
313
314 for handle in handles {
316 handle.join().unwrap();
317 }
318
319 let stats = pool.stats();
321 assert!(stats.row_buffers_pooled <= MAX_POOLED_BUFFERS);
322 }
323}