1use bytes::{Bytes, BytesMut};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14#[derive(Clone)]
19pub struct BytesPool {
20 pool: Arc<Mutex<HashMap<usize, Vec<BytesMut>>>>,
23 stats: Arc<Mutex<PoolStats>>,
25}
26
27impl Default for BytesPool {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl BytesPool {
34 pub fn new() -> Self {
36 Self {
37 pool: Arc::new(Mutex::new(HashMap::new())),
38 stats: Arc::new(Mutex::new(PoolStats::default())),
39 }
40 }
41
42 pub fn get(&self, capacity: usize) -> BytesMut {
47 let bucket = Self::capacity_bucket(capacity);
48
49 let mut pool = self.pool.lock().unwrap();
50 let mut stats = self.stats.lock().unwrap();
51
52 if let Some(buffers) = pool.get_mut(&bucket) {
53 if let Some(mut buf) = buffers.pop() {
54 buf.clear();
55 stats.hits += 1;
56 return buf;
57 }
58 }
59
60 stats.misses += 1;
61 stats.allocations += 1;
62 BytesMut::with_capacity(bucket)
63 }
64
65 pub fn put(&self, mut buf: BytesMut) {
69 if buf.capacity() > 1024 * 1024 * 4 {
71 return;
73 }
74
75 buf.clear();
76 let bucket = Self::capacity_bucket(buf.capacity());
77
78 let mut pool = self.pool.lock().unwrap();
79 let buffers = pool.entry(bucket).or_default();
80
81 if buffers.len() < 100 {
83 buffers.push(buf);
84 }
85 }
86
87 pub fn stats(&self) -> PoolStats {
89 *self.stats.lock().unwrap()
90 }
91
92 pub fn clear(&self) {
94 self.pool.lock().unwrap().clear();
95 }
96
97 fn capacity_bucket(capacity: usize) -> usize {
99 if capacity == 0 {
100 return 1024; }
102 capacity.next_power_of_two().max(1024)
103 }
104}
105
106#[derive(Clone)]
111pub struct CidStringPool {
112 pool: Arc<Mutex<HashMap<String, Arc<str>>>>,
114 stats: Arc<Mutex<PoolStats>>,
116}
117
118impl Default for CidStringPool {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl CidStringPool {
125 pub fn new() -> Self {
127 Self {
128 pool: Arc::new(Mutex::new(HashMap::new())),
129 stats: Arc::new(Mutex::new(PoolStats::default())),
130 }
131 }
132
133 pub fn intern(&self, s: &str) -> Arc<str> {
138 let mut pool = self.pool.lock().unwrap();
139 let mut stats = self.stats.lock().unwrap();
140
141 if let Some(existing) = pool.get(s) {
142 stats.hits += 1;
143 return Arc::clone(existing);
144 }
145
146 stats.misses += 1;
147 let arc: Arc<str> = Arc::from(s);
148 pool.insert(s.to_string(), Arc::clone(&arc));
149 arc
150 }
151
152 pub fn stats(&self) -> PoolStats {
154 *self.stats.lock().unwrap()
155 }
156
157 pub fn clear(&self) {
159 self.pool.lock().unwrap().clear();
160 }
161
162 pub fn len(&self) -> usize {
164 self.pool.lock().unwrap().len()
165 }
166
167 pub fn is_empty(&self) -> bool {
169 self.pool.lock().unwrap().is_empty()
170 }
171}
172
173#[derive(Debug, Clone, Copy, Default)]
175pub struct PoolStats {
176 pub hits: u64,
178 pub misses: u64,
180 pub allocations: u64,
182}
183
184impl PoolStats {
185 pub fn hit_rate(&self) -> f64 {
187 let total = self.hits + self.misses;
188 if total == 0 {
189 return 0.0;
190 }
191 self.hits as f64 / total as f64
192 }
193
194 pub fn miss_rate(&self) -> f64 {
196 1.0 - self.hit_rate()
197 }
198}
199
200static GLOBAL_BYTES_POOL: once_cell::sync::Lazy<BytesPool> =
202 once_cell::sync::Lazy::new(BytesPool::new);
203
204static GLOBAL_CID_STRING_POOL: once_cell::sync::Lazy<CidStringPool> =
206 once_cell::sync::Lazy::new(CidStringPool::new);
207
208pub fn global_bytes_pool() -> &'static BytesPool {
210 &GLOBAL_BYTES_POOL
211}
212
213pub fn global_cid_string_pool() -> &'static CidStringPool {
215 &GLOBAL_CID_STRING_POOL
216}
217
218pub fn freeze_bytes(buf: BytesMut) -> Bytes {
220 buf.freeze()
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn test_bytes_pool_basic() {
229 let pool = BytesPool::new();
230
231 let buf1 = pool.get(1024);
233 assert!(buf1.capacity() >= 1024);
234
235 pool.put(buf1);
237
238 let buf2 = pool.get(1024);
240 assert!(buf2.capacity() >= 1024);
241
242 let stats = pool.stats();
243 assert_eq!(stats.hits, 1);
244 assert_eq!(stats.misses, 1);
245 }
246
247 #[test]
248 fn test_bytes_pool_capacity_bucketing() {
249 let pool = BytesPool::new();
250
251 let buf1 = pool.get(100);
253 let buf2 = pool.get(1000);
254 let buf3 = pool.get(2000);
255
256 assert!(buf1.capacity() >= 100);
258 assert!(buf2.capacity() >= 1000);
259 assert!(buf3.capacity() >= 2000);
260
261 pool.put(buf1);
262 pool.put(buf2);
263 pool.put(buf3);
264
265 let buf4 = pool.get(150); let buf5 = pool.get(1100); assert!(buf4.capacity() >= 150);
270 assert!(buf5.capacity() >= 1100);
271 }
272
273 #[test]
274 fn test_cid_string_pool_basic() {
275 let pool = CidStringPool::new();
276
277 let s1 = pool.intern("QmTest123");
279 let s2 = pool.intern("QmTest123");
280
281 assert_eq!(s1.as_ref(), s2.as_ref());
283 assert!(Arc::ptr_eq(&s1, &s2));
284
285 let stats = pool.stats();
286 assert_eq!(stats.hits, 1);
287 assert_eq!(stats.misses, 1);
288 }
289
290 #[test]
291 fn test_cid_string_pool_different_strings() {
292 let pool = CidStringPool::new();
293
294 let s1 = pool.intern("QmTest1");
295 let s2 = pool.intern("QmTest2");
296
297 assert_ne!(s1.as_ref(), s2.as_ref());
299 assert!(!Arc::ptr_eq(&s1, &s2));
300
301 let stats = pool.stats();
302 assert_eq!(stats.hits, 0);
303 assert_eq!(stats.misses, 2);
304 }
305
306 #[test]
307 fn test_pool_stats_hit_rate() {
308 let stats = PoolStats {
309 hits: 80,
310 misses: 20,
311 allocations: 20,
312 };
313
314 assert!((stats.hit_rate() - 0.8).abs() < 0.001);
315 assert!((stats.miss_rate() - 0.2).abs() < 0.001);
316 }
317
318 #[test]
319 fn test_pool_stats_empty() {
320 let stats = PoolStats::default();
321 assert_eq!(stats.hit_rate(), 0.0);
322 assert_eq!(stats.miss_rate(), 1.0);
323 }
324
325 #[test]
326 fn test_bytes_pool_clear() {
327 let pool = BytesPool::new();
328 let buf = pool.get(1024);
329 pool.put(buf);
330
331 pool.clear();
332
333 let _buf2 = pool.get(1024);
335 let stats = pool.stats();
336 assert_eq!(stats.misses, 2); }
338
339 #[test]
340 fn test_cid_string_pool_len() {
341 let pool = CidStringPool::new();
342 assert_eq!(pool.len(), 0);
343 assert!(pool.is_empty());
344
345 pool.intern("QmTest1");
346 assert_eq!(pool.len(), 1);
347 assert!(!pool.is_empty());
348
349 pool.intern("QmTest2");
350 assert_eq!(pool.len(), 2);
351
352 pool.intern("QmTest1"); assert_eq!(pool.len(), 2); }
355
356 #[test]
357 fn test_bytes_pool_size_limit() {
358 let pool = BytesPool::new();
359
360 let large_buf = BytesMut::with_capacity(10 * 1024 * 1024);
362 pool.put(large_buf);
363
364 let _buf = pool.get(10 * 1024 * 1024);
366 let stats = pool.stats();
367
368 assert!(stats.misses >= 1);
370 }
371
372 #[test]
373 fn test_global_pools() {
374 let bytes_pool = global_bytes_pool();
375 let cid_pool = global_cid_string_pool();
376
377 let _buf = bytes_pool.get(1024);
379 let _s = cid_pool.intern("QmTest");
380 }
381
382 #[test]
383 fn test_freeze_bytes() {
384 let mut buf = BytesMut::with_capacity(1024);
385 buf.extend_from_slice(b"Hello, world!");
386 let bytes = freeze_bytes(buf);
387 assert_eq!(&bytes[..], b"Hello, world!");
388 }
389}