zero_mysql/
buffer_pool.rs

1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3use std::sync::{Arc, LazyLock};
4
5use crossbeam_queue::ArrayQueue;
6
7use crate::BufferSet;
8
9const POOL_CAPACITY: usize = 128;
10
11pub static GLOBAL_BUFFER_POOL: LazyLock<Arc<BufferPool>> =
12    LazyLock::new(|| Arc::new(BufferPool::default()));
13
14/// A pooled `Vec<u8>` that returns itself to the pool on drop.
15pub struct PooledColumnDefinitionVec {
16    pool: Arc<BufferPool>,
17    inner: ManuallyDrop<Vec<u8>>,
18}
19
20impl PooledColumnDefinitionVec {
21    fn new(pool: Arc<BufferPool>, vec: Vec<u8>) -> Self {
22        Self {
23            pool,
24            inner: ManuallyDrop::new(vec),
25        }
26    }
27}
28
29impl Deref for PooledColumnDefinitionVec {
30    type Target = Vec<u8>;
31
32    fn deref(&self) -> &Self::Target {
33        &self.inner
34    }
35}
36
37impl DerefMut for PooledColumnDefinitionVec {
38    fn deref_mut(&mut self) -> &mut Self::Target {
39        &mut self.inner
40    }
41}
42
43impl Drop for PooledColumnDefinitionVec {
44    fn drop(&mut self) {
45        // SAFETY: inner is never accessed after this
46        let vec = unsafe { ManuallyDrop::take(&mut self.inner) };
47        self.pool.return_column_definition(vec);
48    }
49}
50
51/// A pooled `BufferSet` that returns itself to the pool on drop.
52pub struct PooledBufferSet {
53    pool: Arc<BufferPool>,
54    inner: ManuallyDrop<BufferSet>,
55}
56
57impl PooledBufferSet {
58    fn new(pool: Arc<BufferPool>, buffer_set: BufferSet) -> Self {
59        Self {
60            pool,
61            inner: ManuallyDrop::new(buffer_set),
62        }
63    }
64}
65
66impl Deref for PooledBufferSet {
67    type Target = BufferSet;
68
69    fn deref(&self) -> &Self::Target {
70        &self.inner
71    }
72}
73
74impl DerefMut for PooledBufferSet {
75    fn deref_mut(&mut self) -> &mut Self::Target {
76        &mut self.inner
77    }
78}
79
80impl Drop for PooledBufferSet {
81    fn drop(&mut self) {
82        // SAFETY: inner is never accessed after this
83        let buffer_set = unsafe { ManuallyDrop::take(&mut self.inner) };
84        self.pool.return_buffer_set(buffer_set);
85    }
86}
87
88#[derive(Debug)]
89pub struct BufferPool {
90    buffer_sets: ArrayQueue<BufferSet>,
91    column_definition_buffers: ArrayQueue<Vec<u8>>,
92}
93
94impl BufferPool {
95    pub fn new(capacity: usize) -> Self {
96        Self {
97            buffer_sets: ArrayQueue::new(capacity),
98            column_definition_buffers: ArrayQueue::new(capacity),
99        }
100    }
101
102    pub fn get_buffer_set(self: &Arc<Self>) -> PooledBufferSet {
103        let buffer_set = self.buffer_sets.pop().unwrap_or_default();
104        PooledBufferSet::new(Arc::clone(self), buffer_set)
105    }
106
107    pub fn return_buffer_set(&self, mut buffer_set: BufferSet) {
108        // Clear buffers but preserve capacity
109        buffer_set.initial_handshake.clear();
110        buffer_set.read_buffer.clear();
111        buffer_set.column_definition_buffer.clear();
112        // write_buffer is handled by new_write_buffer()
113
114        // Ignore if pool is full
115        let _ = self.buffer_sets.push(buffer_set);
116    }
117
118    pub fn get_column_definition(self: &Arc<Self>) -> PooledColumnDefinitionVec {
119        let vec = self.column_definition_buffers.pop().unwrap_or_default();
120        PooledColumnDefinitionVec::new(Arc::clone(self), vec)
121    }
122
123    pub fn return_column_definition(&self, mut vec: Vec<u8>) {
124        vec.clear();
125        // Ignore if pool is full
126        let _ = self.column_definition_buffers.push(vec);
127    }
128}
129
130impl Default for BufferPool {
131    fn default() -> Self {
132        Self::new(POOL_CAPACITY)
133    }
134}