reifydb_core/value/column/pool/
allocator.rs1use std::{
5 collections::HashMap,
6 sync::{Arc, Mutex},
7};
8
9use crate::value::column::pool::{capacity::ContainerCapacity, stats::PoolStats};
10
11pub trait PoolAllocator<C> {
13 fn acquire(&self, capacity: usize) -> C;
15
16 fn release(&self, container: C);
18
19 fn clear(&self);
21
22 fn stats(&self) -> PoolStats;
24}
25
26pub struct StdPoolAllocator<C> {
34 pools: Arc<Mutex<HashMap<usize, Vec<C>>>>,
35 stats: Arc<Mutex<PoolStats>>,
36 max_pool_size: usize,
37}
38
39impl<C> StdPoolAllocator<C> {
40 pub(crate) fn new(max_pool_size: usize) -> Self {
41 let result = Self {
42 pools: Arc::new(Mutex::new(HashMap::new())),
43 stats: Arc::new(Mutex::new(PoolStats::default())),
44 max_pool_size,
45 };
46 result
47 }
48
49 pub(crate) fn create_new(&self, capacity: usize) -> C
51 where
52 C: ContainerCapacity,
53 {
54 C::with_capacity(capacity)
55 }
56
57 pub(crate) fn capacity_bucket(capacity: usize) -> usize {
60 if capacity == 0 {
61 return 8; }
63 capacity.next_power_of_two().max(8)
64 }
65}
66
67impl<C> PoolAllocator<C> for StdPoolAllocator<C>
68where
69 C: ContainerCapacity,
70{
71 fn acquire(&self, capacity: usize) -> C {
72 let bucket = Self::capacity_bucket(capacity);
73
74 if let Ok(mut pools) = self.pools.lock() {
76 if let Some(pool) = pools.get_mut(&bucket) {
77 if let Some(mut container) = pool.pop() {
78 container.clear();
79
80 if let Ok(mut stats) = self.stats.lock() {
82 stats.total_acquired += 1;
83 }
84
85 return container;
86 }
87 }
88 }
89
90 let container = self.create_new(bucket);
92
93 if let Ok(mut stats) = self.stats.lock() {
95 stats.total_acquired += 1;
96 }
97
98 container
99 }
100
101 fn release(&self, container: C) {
102 let capacity = container.capacity();
103 let bucket = Self::capacity_bucket(capacity);
104
105 if let Ok(mut pools) = self.pools.lock() {
106 let pool = pools.entry(bucket).or_insert_with(Vec::new);
107
108 if pool.len() < self.max_pool_size {
111 pool.push(container);
112 }
113 }
114
115 if let Ok(mut stats) = self.stats.lock() {
117 stats.total_released += 1;
118 }
119 }
120
121 fn clear(&self) {
122 if let Ok(mut pools) = self.pools.lock() {
123 pools.clear();
124 }
125
126 if let Ok(mut stats) = self.stats.lock() {
127 *stats = PoolStats::default();
128 }
129 }
130
131 fn stats(&self) -> PoolStats {
132 if let Ok(pools) = self.pools.lock() {
133 let available = pools.values().map(|pool| pool.len()).sum();
134
135 if let Ok(stats) = self.stats.lock() {
136 return PoolStats {
137 available,
138 total_acquired: stats.total_acquired,
139 total_released: stats.total_released,
140 };
141 }
142 }
143
144 PoolStats::default()
145 }
146}
147
148#[cfg(test)]
149pub mod tests {
150 use reifydb_type::value::container::{bool::BoolContainer, number::NumberContainer, utf8::Utf8Container};
151
152 use crate::value::column::pool::allocator::{PoolAllocator, StdPoolAllocator};
153
154 #[test]
155 fn test_allocate_bool() {
156 let pools = StdPoolAllocator::<BoolContainer>::new(4);
157
158 let container1 = pools.acquire(10);
160 assert!(container1.capacity() >= 10);
161
162 pools.release(container1);
164
165 let stats = pools.stats();
167 assert_eq!(stats.total_acquired, 1);
168 assert_eq!(stats.total_released, 1);
169 assert_eq!(stats.available, 1);
170
171 let container2 = pools.acquire(10);
173 let stats2 = pools.stats();
174 assert_eq!(stats2.total_acquired, 2);
175 assert_eq!(stats2.available, 0);
176
177 pools.release(container2);
178 }
179
180 #[test]
181 fn test_allocate_string() {
182 let pools = StdPoolAllocator::<Utf8Container>::new(4);
183
184 let container = pools.acquire(20);
185 assert!(container.capacity() >= 20);
186
187 pools.release(container);
188
189 let stats = pools.stats();
190 assert_eq!(stats.total_acquired, 1);
191 assert_eq!(stats.total_released, 1);
192 assert_eq!(stats.available, 1);
193 }
194
195 #[test]
196 fn test_allocate_number() {
197 let pools = StdPoolAllocator::<NumberContainer<i32>>::new(4);
198
199 let container = pools.acquire(50);
200 assert!(container.capacity() >= 50);
201
202 pools.release(container);
203
204 let stats = pools.stats();
205 assert_eq!(stats.total_acquired, 1);
206 assert_eq!(stats.total_released, 1);
207 assert_eq!(stats.available, 1);
208 }
209
210 #[test]
211 fn test_pool_max_size() {
212 let pools = StdPoolAllocator::<BoolContainer>::new(2);
213
214 let c1 = pools.acquire(10);
216 let c2 = pools.acquire(10);
217 let c3 = pools.acquire(10);
218
219 pools.release(c1);
220 pools.release(c2);
221 pools.release(c3); let stats = pools.stats();
224 assert_eq!(stats.available, 2); }
226
227 #[test]
228 fn test_capacity_bucket() {
229 assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(0), 8);
230 assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(1), 8);
231 assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(8), 8);
232 assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(9), 16);
233 assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(16), 16);
234 assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(17), 32);
235 assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(100), 128);
236 }
237}