vibesql_storage/
query_buffer_pool.rs

1//! Memory buffer pooling for query execution
2//!
3//! Provides reusable buffers to reduce allocation overhead in high-volume query execution.
4//! Thread-local pools eliminate lock contention for concurrent access.
5
6use std::cell::RefCell;
7
8use vibesql_types::SqlValue;
9
10use crate::Row;
11
12/// Default initial capacity for row buffers
13const DEFAULT_ROW_CAPACITY: usize = 128;
14
15/// Default initial capacity for value buffers
16const DEFAULT_VALUE_CAPACITY: usize = 16;
17
18/// Maximum number of buffers to keep in each pool
19const MAX_POOLED_BUFFERS: usize = 32;
20
21thread_local! {
22    static ROW_POOL: RefCell<Vec<Vec<Row>>> = const { RefCell::new(Vec::new()) };
23    static VALUE_POOL: RefCell<Vec<Vec<SqlValue>>> = const { RefCell::new(Vec::new()) };
24}
25
26/// Thread-safe buffer pool for reusing allocations across query executions
27///
28/// Uses thread-local storage to eliminate lock contention. Each thread maintains
29/// its own pool of buffers, bounded at MAX_POOLED_BUFFERS entries.
30#[derive(Debug, Clone, Copy, Default)]
31pub struct QueryBufferPool;
32
33impl QueryBufferPool {
34    /// Create a new empty buffer pool
35    pub fn new() -> Self {
36        Self
37    }
38
39    /// Get a row buffer from the pool, or create a new one
40    ///
41    /// Returns a buffer with at least `min_capacity` capacity.
42    /// The buffer will be empty but may have allocated capacity.
43    pub fn get_row_buffer(&self, min_capacity: usize) -> Vec<Row> {
44        ROW_POOL.with(|pool| {
45            let mut pool = pool.borrow_mut();
46
47            // Try to find a buffer with sufficient capacity
48            if let Some(pos) = pool.iter().position(|buf| buf.capacity() >= min_capacity) {
49                let mut buffer = pool.swap_remove(pos);
50                buffer.clear();
51                buffer
52            } else {
53                // No suitable buffer found, create new one
54                Vec::with_capacity(min_capacity.max(DEFAULT_ROW_CAPACITY))
55            }
56        })
57    }
58
59    /// Return a row buffer to the pool for reuse
60    ///
61    /// The buffer is cleared before being returned to the pool.
62    /// If the pool is full, the buffer is dropped.
63    pub fn return_row_buffer(&self, mut buffer: Vec<Row>) {
64        buffer.clear();
65
66        ROW_POOL.with(|pool| {
67            let mut pool = pool.borrow_mut();
68            if pool.len() < MAX_POOLED_BUFFERS {
69                pool.push(buffer);
70            }
71            // else: drop the buffer (pool is full)
72        });
73    }
74
75    /// Get a value buffer from the pool, or create a new one
76    ///
77    /// Returns a buffer with at least `min_capacity` capacity.
78    /// The buffer will be empty but may have allocated capacity.
79    pub fn get_value_buffer(&self, min_capacity: usize) -> Vec<SqlValue> {
80        VALUE_POOL.with(|pool| {
81            let mut pool = pool.borrow_mut();
82
83            // Try to find a buffer with sufficient capacity
84            if let Some(pos) = pool.iter().position(|buf| buf.capacity() >= min_capacity) {
85                let mut buffer = pool.swap_remove(pos);
86                buffer.clear();
87                buffer
88            } else {
89                // No suitable buffer found, create new one
90                Vec::with_capacity(min_capacity.max(DEFAULT_VALUE_CAPACITY))
91            }
92        })
93    }
94
95    /// Return a value buffer to the pool for reuse
96    ///
97    /// The buffer is cleared before being returned to the pool.
98    /// If the pool is full, the buffer is dropped.
99    pub fn return_value_buffer(&self, mut buffer: Vec<SqlValue>) {
100        buffer.clear();
101
102        VALUE_POOL.with(|pool| {
103            let mut pool = pool.borrow_mut();
104            if pool.len() < MAX_POOLED_BUFFERS {
105                pool.push(buffer);
106            }
107            // else: drop the buffer (pool is full)
108        });
109    }
110
111    /// Get statistics about pool usage (for debugging/monitoring)
112    ///
113    /// Returns stats for the current thread's pool only.
114    pub fn stats(&self) -> QueryBufferPoolStats {
115        let row_buffers_pooled = ROW_POOL.with(|pool| pool.borrow().len());
116        let value_buffers_pooled = VALUE_POOL.with(|pool| pool.borrow().len());
117
118        QueryBufferPoolStats { row_buffers_pooled, value_buffers_pooled }
119    }
120
121    /// Clear all thread-local buffer pools, releasing memory back to the allocator.
122    ///
123    /// This is useful for benchmarks and long-running processes where memory
124    /// pressure needs to be reduced between query batches. The pools will
125    /// automatically refill as needed during subsequent query execution.
126    ///
127    /// # Example
128    ///
129    /// ```text
130    /// use vibesql_storage::QueryBufferPool;
131    ///
132    /// // Run a batch of queries...
133    ///
134    /// // Clear pools to release memory
135    /// QueryBufferPool::clear_thread_local_pools();
136    /// ```
137    pub fn clear_thread_local_pools() {
138        ROW_POOL.with(|pool| {
139            pool.borrow_mut().clear();
140        });
141        VALUE_POOL.with(|pool| {
142            pool.borrow_mut().clear();
143        });
144    }
145}
146
147/// Statistics about buffer pool usage
148#[derive(Debug, Clone, Copy)]
149pub struct QueryBufferPoolStats {
150    pub row_buffers_pooled: usize,
151    pub value_buffers_pooled: usize,
152}
153
154/// RAII guard for row buffers - automatically returns buffer to pool when dropped
155pub struct RowBufferGuard {
156    buffer: Option<Vec<Row>>,
157    pool: QueryBufferPool,
158}
159
160impl RowBufferGuard {
161    /// Create a new guard wrapping a buffer
162    pub fn new(buffer: Vec<Row>, pool: QueryBufferPool) -> Self {
163        Self { buffer: Some(buffer), pool }
164    }
165
166    /// Take ownership of the buffer, consuming the guard without returning to pool
167    pub fn take(mut self) -> Vec<Row> {
168        self.buffer.take().expect("buffer already taken")
169    }
170
171    /// Get a reference to the buffer
172    #[allow(clippy::should_implement_trait)]
173    pub fn as_ref(&self) -> &Vec<Row> {
174        self.buffer.as_ref().expect("buffer already taken")
175    }
176
177    /// Get a mutable reference to the buffer
178    #[allow(clippy::should_implement_trait)]
179    pub fn as_mut(&mut self) -> &mut Vec<Row> {
180        self.buffer.as_mut().expect("buffer already taken")
181    }
182}
183
184impl Drop for RowBufferGuard {
185    fn drop(&mut self) {
186        if let Some(buffer) = self.buffer.take() {
187            self.pool.return_row_buffer(buffer);
188        }
189    }
190}
191
192/// RAII guard for value buffers - automatically returns buffer to pool when dropped
193pub struct ValueBufferGuard {
194    buffer: Option<Vec<SqlValue>>,
195    pool: QueryBufferPool,
196}
197
198impl ValueBufferGuard {
199    /// Create a new guard wrapping a buffer
200    pub fn new(buffer: Vec<SqlValue>, pool: QueryBufferPool) -> Self {
201        Self { buffer: Some(buffer), pool }
202    }
203
204    /// Take ownership of the buffer, consuming the guard without returning to pool
205    pub fn take(mut self) -> Vec<SqlValue> {
206        self.buffer.take().expect("buffer already taken")
207    }
208
209    /// Get a reference to the buffer
210    #[allow(clippy::should_implement_trait)]
211    pub fn as_ref(&self) -> &Vec<SqlValue> {
212        self.buffer.as_ref().expect("buffer already taken")
213    }
214
215    /// Get a mutable reference to the buffer
216    #[allow(clippy::should_implement_trait)]
217    pub fn as_mut(&mut self) -> &mut Vec<SqlValue> {
218        self.buffer.as_mut().expect("buffer already taken")
219    }
220}
221
222impl Drop for ValueBufferGuard {
223    fn drop(&mut self) {
224        if let Some(buffer) = self.buffer.take() {
225            self.pool.return_value_buffer(buffer);
226        }
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn test_row_buffer_pool_reuse() {
236        let pool = QueryBufferPool::new();
237
238        // Get a buffer
239        let buffer = pool.get_row_buffer(10);
240        assert!(buffer.capacity() >= 10);
241
242        // Return it
243        pool.return_row_buffer(buffer);
244
245        // Get it again - should reuse the same allocation
246        let buffer2 = pool.get_row_buffer(10);
247        assert!(buffer2.capacity() >= 10);
248    }
249
250    #[test]
251    fn test_value_buffer_pool_reuse() {
252        let pool = QueryBufferPool::new();
253
254        // Get a buffer
255        let buffer = pool.get_value_buffer(5);
256        assert!(buffer.capacity() >= 5);
257
258        // Return it
259        pool.return_value_buffer(buffer);
260
261        // Get it again - should reuse
262        let buffer2 = pool.get_value_buffer(5);
263        assert!(buffer2.capacity() >= 5);
264    }
265
266    #[test]
267    fn test_buffer_guard_auto_return() {
268        let pool = QueryBufferPool::new();
269
270        {
271            let buffer = pool.get_row_buffer(10);
272            let _guard = RowBufferGuard::new(buffer, pool);
273            // Guard dropped here, should return buffer to pool
274        }
275
276        let stats = pool.stats();
277        assert_eq!(stats.row_buffers_pooled, 1);
278    }
279
280    #[test]
281    fn test_pool_max_size() {
282        let pool = QueryBufferPool::new();
283
284        // Add more than MAX_POOLED_BUFFERS
285        for _ in 0..(MAX_POOLED_BUFFERS + 10) {
286            let buffer = Vec::with_capacity(10);
287            pool.return_row_buffer(buffer);
288        }
289
290        let stats = pool.stats();
291        assert_eq!(stats.row_buffers_pooled, MAX_POOLED_BUFFERS);
292    }
293
294    #[test]
295    fn test_concurrent_access_thread_safety() {
296        use std::{sync::Arc, thread};
297
298        let pool = Arc::new(QueryBufferPool::new());
299        let mut handles = vec![];
300
301        // Spawn multiple threads that access the pool concurrently
302        for _ in 0..4 {
303            let pool = Arc::clone(&pool);
304            let handle = thread::spawn(move || {
305                for _ in 0..100 {
306                    let buffer = pool.get_row_buffer(10);
307                    assert!(buffer.capacity() >= 10);
308                    pool.return_row_buffer(buffer);
309                }
310            });
311            handles.push(handle);
312        }
313
314        // Wait for all threads to complete
315        for handle in handles {
316            handle.join().unwrap();
317        }
318
319        // Each thread has its own pool, so stats show only current thread's pool
320        let stats = pool.stats();
321        assert!(stats.row_buffers_pooled <= MAX_POOLED_BUFFERS);
322    }
323}