1use crate::crdt::{GCounter, LwwMap, LwwRegister, ReplicaId};
4use parking_lot::Mutex;
5use std::sync::Arc;
6use std::collections::HashMap;
7
8#[derive(Debug, Clone)]
10pub struct PoolConfig {
11 pub initial_size: usize,
13 pub max_size: usize,
15 pub enable_cleanup: bool,
17 pub cleanup_interval: u64,
19}
20
21impl Default for PoolConfig {
22 fn default() -> Self {
23 Self {
24 initial_size: 100,
25 max_size: 1000,
26 enable_cleanup: true,
27 cleanup_interval: 300, }
29 }
30}
31
32pub struct CRDTMemoryPool {
34 config: PoolConfig,
35 lww_registers: Arc<Mutex<Vec<LwwRegister<String>>>>,
36 lww_maps: Arc<Mutex<Vec<LwwMap<String, String>>>>,
37 gcounters: Arc<Mutex<Vec<GCounter>>>,
38 stats: Arc<Mutex<PoolStats>>,
39}
40
41#[derive(Debug, Default, Clone)]
43pub struct PoolStats {
44 pub lww_register_allocations: usize,
45 pub lww_register_deallocations: usize,
46 pub lww_map_allocations: usize,
47 pub lww_map_deallocations: usize,
48 pub gcounter_allocations: usize,
49 pub gcounter_deallocations: usize,
50 pub pool_hits: usize,
51 pub pool_misses: usize,
52}
53
54impl CRDTMemoryPool {
55 pub fn new() -> Self {
57 Self::with_config(PoolConfig::default())
58 }
59
60 pub fn with_config(config: PoolConfig) -> Self {
62 let pool = Self {
63 config,
64 lww_registers: Arc::new(Mutex::new(Vec::new())),
65 lww_maps: Arc::new(Mutex::new(Vec::new())),
66 gcounters: Arc::new(Mutex::new(Vec::new())),
67 stats: Arc::new(Mutex::new(PoolStats::default())),
68 };
69
70 pool.pre_populate();
72 pool
73 }
74
75 fn pre_populate(&self) {
77 let mut registers = self.lww_registers.lock();
79 for _ in 0..self.config.initial_size {
80 registers.push(LwwRegister::new(String::new(), ReplicaId::default()));
81 }
82
83 let mut maps = self.lww_maps.lock();
85 for _ in 0..self.config.initial_size {
86 maps.push(LwwMap::new());
87 }
88
89 let mut counters = self.gcounters.lock();
91 for _ in 0..self.config.initial_size {
92 counters.push(GCounter::new());
93 }
94 }
95
96 pub fn get_lww_register(&self) -> LwwRegister<String> {
98 let mut registers = self.lww_registers.lock();
99 if let Some(register) = registers.pop() {
100 self.stats.lock().pool_hits += 1;
101 register
102 } else {
103 self.stats.lock().pool_misses += 1;
104 self.stats.lock().lww_register_allocations += 1;
105 LwwRegister::new(String::new(), ReplicaId::default())
106 }
107 }
108
109 pub fn return_lww_register(&self, register: LwwRegister<String>) {
111 let mut registers = self.lww_registers.lock();
112 if registers.len() < self.config.max_size {
113 registers.push(register);
114 self.stats.lock().lww_register_deallocations += 1;
115 }
116 }
117
118 pub fn get_lww_map(&self) -> LwwMap<String, String> {
120 let mut maps = self.lww_maps.lock();
121 if let Some(map) = maps.pop() {
122 self.stats.lock().pool_hits += 1;
123 map
124 } else {
125 self.stats.lock().pool_misses += 1;
126 self.stats.lock().lww_map_allocations += 1;
127 LwwMap::new()
128 }
129 }
130
131 pub fn return_lww_map(&self, map: LwwMap<String, String>) {
133 let mut maps = self.lww_maps.lock();
134 if maps.len() < self.config.max_size {
135 maps.push(map);
136 self.stats.lock().lww_map_deallocations += 1;
137 }
138 }
139
140 pub fn get_gcounter(&self) -> GCounter {
142 let mut counters = self.gcounters.lock();
143 if let Some(counter) = counters.pop() {
144 self.stats.lock().pool_hits += 1;
145 counter
146 } else {
147 self.stats.lock().pool_misses += 1;
148 self.stats.lock().gcounter_allocations += 1;
149 GCounter::new()
150 }
151 }
152
153 pub fn return_gcounter(&self, counter: GCounter) {
155 let mut counters = self.gcounters.lock();
156 if counters.len() < self.config.max_size {
157 counters.push(counter);
158 self.stats.lock().gcounter_deallocations += 1;
159 }
160 }
161
162 pub fn stats(&self) -> PoolStats {
164 self.stats.lock().clone()
165 }
166
167 pub fn pool_sizes(&self) -> HashMap<String, usize> {
169 let mut sizes = HashMap::new();
170 sizes.insert("lww_registers".to_string(), self.lww_registers.lock().len());
171 sizes.insert("lww_maps".to_string(), self.lww_maps.lock().len());
172 sizes.insert("gcounters".to_string(), self.gcounters.lock().len());
173 sizes
174 }
175
176 pub fn clear(&self) {
178 self.lww_registers.lock().clear();
179 self.lww_maps.lock().clear();
180 self.gcounters.lock().clear();
181 }
182
183 pub fn resize(&self, target_size: usize) {
185 self.clear();
187
188 let mut registers = self.lww_registers.lock();
190 for _ in 0..target_size {
191 registers.push(LwwRegister::new(String::new(), ReplicaId::default()));
192 }
193 drop(registers);
194
195 let mut maps = self.lww_maps.lock();
196 for _ in 0..target_size {
197 maps.push(LwwMap::new());
198 }
199 drop(maps);
200
201 let mut counters = self.gcounters.lock();
202 for _ in 0..target_size {
203 counters.push(GCounter::new());
204 }
205 }
206}
207
208impl Default for CRDTMemoryPool {
209 fn default() -> Self {
210 Self::new()
211 }
212}
213
214pub struct PooledCRDT<T: Clone> {
216 inner: T,
217 pool: Arc<CRDTMemoryPool>,
218 return_fn: Box<dyn FnOnce(T, &CRDTMemoryPool)>,
219}
220
221impl<T: Clone> PooledCRDT<T> {
222 pub fn new(
224 inner: T,
225 pool: Arc<CRDTMemoryPool>,
226 return_fn: Box<dyn FnOnce(T, &CRDTMemoryPool)>,
227 ) -> Self {
228 Self {
229 inner,
230 pool,
231 return_fn,
232 }
233 }
234
235 pub fn inner(&self) -> &T {
237 &self.inner
238 }
239
240 pub fn inner_mut(&mut self) -> &mut T {
242 &mut self.inner
243 }
244
245 pub fn return_to_pool(self) {
247 drop(self);
250 }
251}
252
253impl<T: Clone> std::ops::Deref for PooledCRDT<T> {
257 type Target = T;
258
259 fn deref(&self) -> &Self::Target {
260 &self.inner
261 }
262}
263
264impl<T: Clone> std::ops::DerefMut for PooledCRDT<T> {
265 fn deref_mut(&mut self) -> &mut Self::Target {
266 &mut self.inner
267 }
268}
269
270impl CRDTMemoryPool {
272 pub fn create_pooled_lww_register(&self) -> PooledCRDT<LwwRegister<String>> {
274 let register = self.get_lww_register();
275 let pool = Arc::new(self.clone());
276 PooledCRDT::new(
277 register,
278 pool,
279 Box::new(|r, p| p.return_lww_register(r)),
280 )
281 }
282
283 pub fn create_pooled_lww_map(&self) -> PooledCRDT<LwwMap<String, String>> {
285 let map = self.get_lww_map();
286 let pool = Arc::new(self.clone());
287 PooledCRDT::new(
288 map,
289 pool,
290 Box::new(|m, p| p.return_lww_map(m)),
291 )
292 }
293
294 pub fn create_pooled_gcounter(&self) -> PooledCRDT<GCounter> {
296 let counter = self.get_gcounter();
297 let pool = Arc::new(self.clone());
298 PooledCRDT::new(
299 counter,
300 pool,
301 Box::new(|c, p| p.return_gcounter(c)),
302 )
303 }
304}
305
306impl Clone for CRDTMemoryPool {
307 fn clone(&self) -> Self {
308 Self {
309 config: self.config.clone(),
310 lww_registers: Arc::clone(&self.lww_registers),
311 lww_maps: Arc::clone(&self.lww_maps),
312 gcounters: Arc::clone(&self.gcounters),
313 stats: Arc::clone(&self.stats),
314 }
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321
322 #[test]
323 fn test_memory_pool_basic() {
324 let pool = CRDTMemoryPool::new();
325
326 let register1 = pool.get_lww_register();
328 let register2 = pool.get_lww_register();
329
330 pool.return_lww_register(register1);
331 pool.return_lww_register(register2);
332
333 let stats = pool.stats();
334 assert_eq!(stats.pool_hits, 2);
336 assert_eq!(stats.pool_misses, 0);
337 assert_eq!(stats.lww_register_deallocations, 2);
338 }
339
340 #[test]
341 fn test_memory_pool_config() {
342 let config = PoolConfig {
343 initial_size: 5,
344 max_size: 10,
345 enable_cleanup: true,
346 cleanup_interval: 60,
347 };
348
349 let pool = CRDTMemoryPool::with_config(config);
350 let sizes = pool.pool_sizes();
351
352 assert_eq!(sizes["lww_registers"], 5);
353 assert_eq!(sizes["lww_maps"], 5);
354 assert_eq!(sizes["gcounters"], 5);
355 }
356
357 #[test]
358 fn test_pooled_crdt() {
359 let pool = CRDTMemoryPool::new();
360 let pooled_register = pool.create_pooled_lww_register();
361
362 let value = pooled_register.inner().value();
364 assert_eq!(value, "");
365
366 pooled_register.return_to_pool();
368
369 let stats = pool.stats();
370 assert_eq!(stats.lww_register_deallocations, 0);
373 }
374
375 #[test]
376 fn test_pool_resize() {
377 let pool = CRDTMemoryPool::new();
378 let initial_sizes = pool.pool_sizes();
379
380 pool.resize(50);
381 let new_sizes = pool.pool_sizes();
382
383 assert_eq!(new_sizes["lww_registers"], 50);
384 assert_eq!(new_sizes["lww_maps"], 50);
385 assert_eq!(new_sizes["gcounters"], 50);
386 }
387}