zero_postgres/
buffer_pool.rs

1//! Buffer pool for reusing buffers across connections.
2
3use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::sync::{Arc, LazyLock};
6
7use crossbeam_queue::ArrayQueue;
8
9use crate::BufferSet;
10
11const POOL_CAPACITY: usize = 128;
12
13/// Global buffer pool for reusing buffers across connections.
14pub static GLOBAL_BUFFER_POOL: LazyLock<Arc<BufferPool>> =
15    LazyLock::new(|| Arc::new(BufferPool::default()));
16
17/// A pooled `BufferSet` that returns itself to the pool on drop.
18pub struct PooledBufferSet {
19    pool: Arc<BufferPool>,
20    inner: ManuallyDrop<BufferSet>,
21}
22
23impl PooledBufferSet {
24    fn new(pool: Arc<BufferPool>, buffer_set: BufferSet) -> Self {
25        Self {
26            pool,
27            inner: ManuallyDrop::new(buffer_set),
28        }
29    }
30}
31
32impl Deref for PooledBufferSet {
33    type Target = BufferSet;
34
35    fn deref(&self) -> &Self::Target {
36        &self.inner
37    }
38}
39
40impl DerefMut for PooledBufferSet {
41    fn deref_mut(&mut self) -> &mut Self::Target {
42        &mut self.inner
43    }
44}
45
46impl Drop for PooledBufferSet {
47    fn drop(&mut self) {
48        // SAFETY: inner is never accessed after this
49        let buffer_set = unsafe { ManuallyDrop::take(&mut self.inner) };
50        self.pool.return_buffer_set(buffer_set);
51    }
52}
53
54/// Buffer pool for reusing `BufferSet` instances across connections.
55#[derive(Debug)]
56pub struct BufferPool {
57    buffer_sets: ArrayQueue<BufferSet>,
58}
59
60impl BufferPool {
61    /// Create a new buffer pool with the given capacity.
62    pub fn new(capacity: usize) -> Self {
63        Self {
64            buffer_sets: ArrayQueue::new(capacity),
65        }
66    }
67
68    /// Get a buffer set from the pool, or create a new one if empty.
69    pub fn get_buffer_set(self: &Arc<Self>) -> PooledBufferSet {
70        let buffer_set = self.buffer_sets.pop().unwrap_or_default();
71        PooledBufferSet::new(Arc::clone(self), buffer_set)
72    }
73
74    /// Return a buffer set to the pool.
75    pub fn return_buffer_set(&self, mut buffer_set: BufferSet) {
76        // Clear buffers but preserve capacity
77        buffer_set.read_buffer.clear();
78        buffer_set.write_buffer.clear();
79        buffer_set.column_buffer.clear();
80        buffer_set.type_byte = 0;
81
82        // Ignore if pool is full
83        let _ = self.buffer_sets.push(buffer_set);
84    }
85}
86
87impl Default for BufferPool {
88    fn default() -> Self {
89        Self::new(POOL_CAPACITY)
90    }
91}