ipfrs_core/
pool.rs

1//! Memory pooling for frequent allocations
2//!
3//! This module provides memory pools for common allocation patterns:
4//! - Block buffer pool (reuse Bytes allocations)
5//! - CID string pool (deduplicate strings)
6//! - IPLD node pool
7//!
8//! Memory pooling reduces allocator pressure by reusing existing allocations.
9
10use bytes::{Bytes, BytesMut};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14/// A pool of reusable byte buffers
15///
16/// This pool maintains a collection of BytesMut buffers that can be reused
17/// to reduce allocation overhead when creating blocks.
18#[derive(Clone)]
19pub struct BytesPool {
20    /// Available buffers, organized by capacity bucket
21    /// Each bucket contains buffers with capacity in a power-of-2 range
22    pool: Arc<Mutex<HashMap<usize, Vec<BytesMut>>>>,
23    /// Statistics for the pool
24    stats: Arc<Mutex<PoolStats>>,
25}
26
27impl Default for BytesPool {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl BytesPool {
34    /// Create a new bytes pool
35    pub fn new() -> Self {
36        Self {
37            pool: Arc::new(Mutex::new(HashMap::new())),
38            stats: Arc::new(Mutex::new(PoolStats::default())),
39        }
40    }
41
42    /// Get a buffer with at least the requested capacity
43    ///
44    /// If a suitable buffer is available in the pool, it will be reused.
45    /// Otherwise, a new buffer will be allocated.
46    pub fn get(&self, capacity: usize) -> BytesMut {
47        let bucket = Self::capacity_bucket(capacity);
48
49        let mut pool = self.pool.lock().unwrap();
50        let mut stats = self.stats.lock().unwrap();
51
52        if let Some(buffers) = pool.get_mut(&bucket) {
53            if let Some(mut buf) = buffers.pop() {
54                buf.clear();
55                stats.hits += 1;
56                return buf;
57            }
58        }
59
60        stats.misses += 1;
61        stats.allocations += 1;
62        BytesMut::with_capacity(bucket)
63    }
64
65    /// Return a buffer to the pool for reuse
66    ///
67    /// The buffer will be cleared and made available for future use.
68    pub fn put(&self, mut buf: BytesMut) {
69        // Only pool buffers within reasonable size limits
70        if buf.capacity() > 1024 * 1024 * 4 {
71            // Too large, don't pool
72            return;
73        }
74
75        buf.clear();
76        let bucket = Self::capacity_bucket(buf.capacity());
77
78        let mut pool = self.pool.lock().unwrap();
79        let buffers = pool.entry(bucket).or_default();
80
81        // Limit pool size per bucket to prevent unbounded growth
82        if buffers.len() < 100 {
83            buffers.push(buf);
84        }
85    }
86
87    /// Get the pool statistics
88    pub fn stats(&self) -> PoolStats {
89        *self.stats.lock().unwrap()
90    }
91
92    /// Clear all pooled buffers
93    pub fn clear(&self) {
94        self.pool.lock().unwrap().clear();
95    }
96
97    /// Round capacity up to the nearest power-of-2 bucket
98    fn capacity_bucket(capacity: usize) -> usize {
99        if capacity == 0 {
100            return 1024; // Minimum 1KB
101        }
102        capacity.next_power_of_two().max(1024)
103    }
104}
105
106/// A pool for CID strings to reduce duplication
107///
108/// This pool maintains a cache of CID strings that have been seen before,
109/// allowing them to be deduplicated and reused.
110#[derive(Clone)]
111pub struct CidStringPool {
112    /// Interned strings
113    pool: Arc<Mutex<HashMap<String, Arc<str>>>>,
114    /// Statistics for the pool
115    stats: Arc<Mutex<PoolStats>>,
116}
117
118impl Default for CidStringPool {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl CidStringPool {
125    /// Create a new CID string pool
126    pub fn new() -> Self {
127        Self {
128            pool: Arc::new(Mutex::new(HashMap::new())),
129            stats: Arc::new(Mutex::new(PoolStats::default())),
130        }
131    }
132
133    /// Intern a CID string
134    ///
135    /// If the string has been seen before, returns the existing Arc.
136    /// Otherwise, creates a new Arc and stores it in the pool.
137    pub fn intern(&self, s: &str) -> Arc<str> {
138        let mut pool = self.pool.lock().unwrap();
139        let mut stats = self.stats.lock().unwrap();
140
141        if let Some(existing) = pool.get(s) {
142            stats.hits += 1;
143            return Arc::clone(existing);
144        }
145
146        stats.misses += 1;
147        let arc: Arc<str> = Arc::from(s);
148        pool.insert(s.to_string(), Arc::clone(&arc));
149        arc
150    }
151
152    /// Get the pool statistics
153    pub fn stats(&self) -> PoolStats {
154        *self.stats.lock().unwrap()
155    }
156
157    /// Clear the pool
158    pub fn clear(&self) {
159        self.pool.lock().unwrap().clear();
160    }
161
162    /// Get the number of unique strings in the pool
163    pub fn len(&self) -> usize {
164        self.pool.lock().unwrap().len()
165    }
166
167    /// Check if the pool is empty
168    pub fn is_empty(&self) -> bool {
169        self.pool.lock().unwrap().is_empty()
170    }
171}
172
173/// Statistics for a memory pool
174#[derive(Debug, Clone, Copy, Default)]
175pub struct PoolStats {
176    /// Number of successful retrievals from the pool
177    pub hits: u64,
178    /// Number of times a new allocation was needed
179    pub misses: u64,
180    /// Total number of allocations made
181    pub allocations: u64,
182}
183
184impl PoolStats {
185    /// Calculate the hit rate (0.0 to 1.0)
186    pub fn hit_rate(&self) -> f64 {
187        let total = self.hits + self.misses;
188        if total == 0 {
189            return 0.0;
190        }
191        self.hits as f64 / total as f64
192    }
193
194    /// Calculate the miss rate (0.0 to 1.0)
195    pub fn miss_rate(&self) -> f64 {
196        1.0 - self.hit_rate()
197    }
198}
199
200/// A global bytes pool instance
201static GLOBAL_BYTES_POOL: once_cell::sync::Lazy<BytesPool> =
202    once_cell::sync::Lazy::new(BytesPool::new);
203
204/// A global CID string pool instance
205static GLOBAL_CID_STRING_POOL: once_cell::sync::Lazy<CidStringPool> =
206    once_cell::sync::Lazy::new(CidStringPool::new);
207
208/// Get the global bytes pool
209pub fn global_bytes_pool() -> &'static BytesPool {
210    &GLOBAL_BYTES_POOL
211}
212
213/// Get the global CID string pool
214pub fn global_cid_string_pool() -> &'static CidStringPool {
215    &GLOBAL_CID_STRING_POOL
216}
217
218/// Helper to convert BytesMut to Bytes efficiently
219pub fn freeze_bytes(buf: BytesMut) -> Bytes {
220    buf.freeze()
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn test_bytes_pool_basic() {
229        let pool = BytesPool::new();
230
231        // Get a buffer
232        let buf1 = pool.get(1024);
233        assert!(buf1.capacity() >= 1024);
234
235        // Return it
236        pool.put(buf1);
237
238        // Get another buffer - should reuse the same one
239        let buf2 = pool.get(1024);
240        assert!(buf2.capacity() >= 1024);
241
242        let stats = pool.stats();
243        assert_eq!(stats.hits, 1);
244        assert_eq!(stats.misses, 1);
245    }
246
247    #[test]
248    fn test_bytes_pool_capacity_bucketing() {
249        let pool = BytesPool::new();
250
251        // Request different sizes
252        let buf1 = pool.get(100);
253        let buf2 = pool.get(1000);
254        let buf3 = pool.get(2000);
255
256        // All should be bucketed to power-of-2 sizes
257        assert!(buf1.capacity() >= 100);
258        assert!(buf2.capacity() >= 1000);
259        assert!(buf3.capacity() >= 2000);
260
261        pool.put(buf1);
262        pool.put(buf2);
263        pool.put(buf3);
264
265        // Request similar sizes - should reuse
266        let buf4 = pool.get(150); // Should reuse first bucket
267        let buf5 = pool.get(1100); // Should reuse second bucket
268
269        assert!(buf4.capacity() >= 150);
270        assert!(buf5.capacity() >= 1100);
271    }
272
273    #[test]
274    fn test_cid_string_pool_basic() {
275        let pool = CidStringPool::new();
276
277        // Intern a string
278        let s1 = pool.intern("QmTest123");
279        let s2 = pool.intern("QmTest123");
280
281        // Should be the same Arc
282        assert_eq!(s1.as_ref(), s2.as_ref());
283        assert!(Arc::ptr_eq(&s1, &s2));
284
285        let stats = pool.stats();
286        assert_eq!(stats.hits, 1);
287        assert_eq!(stats.misses, 1);
288    }
289
290    #[test]
291    fn test_cid_string_pool_different_strings() {
292        let pool = CidStringPool::new();
293
294        let s1 = pool.intern("QmTest1");
295        let s2 = pool.intern("QmTest2");
296
297        // Should be different
298        assert_ne!(s1.as_ref(), s2.as_ref());
299        assert!(!Arc::ptr_eq(&s1, &s2));
300
301        let stats = pool.stats();
302        assert_eq!(stats.hits, 0);
303        assert_eq!(stats.misses, 2);
304    }
305
306    #[test]
307    fn test_pool_stats_hit_rate() {
308        let stats = PoolStats {
309            hits: 80,
310            misses: 20,
311            allocations: 20,
312        };
313
314        assert!((stats.hit_rate() - 0.8).abs() < 0.001);
315        assert!((stats.miss_rate() - 0.2).abs() < 0.001);
316    }
317
318    #[test]
319    fn test_pool_stats_empty() {
320        let stats = PoolStats::default();
321        assert_eq!(stats.hit_rate(), 0.0);
322        assert_eq!(stats.miss_rate(), 1.0);
323    }
324
325    #[test]
326    fn test_bytes_pool_clear() {
327        let pool = BytesPool::new();
328        let buf = pool.get(1024);
329        pool.put(buf);
330
331        pool.clear();
332
333        // After clear, should allocate new buffer
334        let _buf2 = pool.get(1024);
335        let stats = pool.stats();
336        assert_eq!(stats.misses, 2); // Both allocations were misses
337    }
338
339    #[test]
340    fn test_cid_string_pool_len() {
341        let pool = CidStringPool::new();
342        assert_eq!(pool.len(), 0);
343        assert!(pool.is_empty());
344
345        pool.intern("QmTest1");
346        assert_eq!(pool.len(), 1);
347        assert!(!pool.is_empty());
348
349        pool.intern("QmTest2");
350        assert_eq!(pool.len(), 2);
351
352        pool.intern("QmTest1"); // Duplicate
353        assert_eq!(pool.len(), 2); // Should still be 2
354    }
355
356    #[test]
357    fn test_bytes_pool_size_limit() {
358        let pool = BytesPool::new();
359
360        // Very large buffer shouldn't be pooled
361        let large_buf = BytesMut::with_capacity(10 * 1024 * 1024);
362        pool.put(large_buf);
363
364        // Try to get a large buffer - should allocate new
365        let _buf = pool.get(10 * 1024 * 1024);
366        let stats = pool.stats();
367
368        // Both should be misses (large buffer wasn't pooled)
369        assert!(stats.misses >= 1);
370    }
371
372    #[test]
373    fn test_global_pools() {
374        let bytes_pool = global_bytes_pool();
375        let cid_pool = global_cid_string_pool();
376
377        // Just ensure they're accessible
378        let _buf = bytes_pool.get(1024);
379        let _s = cid_pool.intern("QmTest");
380    }
381
382    #[test]
383    fn test_freeze_bytes() {
384        let mut buf = BytesMut::with_capacity(1024);
385        buf.extend_from_slice(b"Hello, world!");
386        let bytes = freeze_bytes(buf);
387        assert_eq!(&bytes[..], b"Hello, world!");
388    }
389}