1use std::sync::{Arc, Mutex, Weak};
9
10pub trait FramePool: Send + Sync + std::fmt::Debug {
39 fn acquire(&self, size: usize) -> Option<PooledBuffer>;
55
56 fn release(&self, _buffer: Vec<u8>) {
65 }
67}
68
69#[derive(Debug)]
87pub struct PooledBuffer {
88 data: Vec<u8>,
90 pool: Option<Weak<dyn FramePool>>,
92}
93
94impl PooledBuffer {
95 #[must_use]
102 pub fn new(data: Vec<u8>, pool: Weak<dyn FramePool>) -> Self {
103 Self {
104 data,
105 pool: Some(pool),
106 }
107 }
108
109 #[must_use]
114 pub fn standalone(data: Vec<u8>) -> Self {
115 Self { data, pool: None }
116 }
117
118 #[must_use]
120 #[inline]
121 pub fn data(&self) -> &[u8] {
122 &self.data
123 }
124
125 #[must_use]
127 #[inline]
128 pub fn data_mut(&mut self) -> &mut [u8] {
129 &mut self.data
130 }
131
132 #[must_use]
134 #[inline]
135 pub fn len(&self) -> usize {
136 self.data.len()
137 }
138
139 #[must_use]
141 #[inline]
142 pub fn is_empty(&self) -> bool {
143 self.data.is_empty()
144 }
145
146 #[must_use]
150 pub fn into_inner(mut self) -> Vec<u8> {
151 self.pool = None;
153 std::mem::take(&mut self.data)
154 }
155}
156
157impl Clone for PooledBuffer {
158 fn clone(&self) -> Self {
166 Self {
167 data: self.data.clone(),
168 pool: None, }
170 }
171}
172
173impl Drop for PooledBuffer {
174 fn drop(&mut self) {
175 if let Some(ref weak_pool) = self.pool
176 && let Some(pool) = weak_pool.upgrade()
177 {
178 let data = std::mem::take(&mut self.data);
180 pool.release(data);
181 }
182 }
184}
185
186impl AsRef<[u8]> for PooledBuffer {
187 fn as_ref(&self) -> &[u8] {
188 &self.data
189 }
190}
191
192impl AsMut<[u8]> for PooledBuffer {
193 fn as_mut(&mut self) -> &mut [u8] {
194 &mut self.data
195 }
196}
197
198#[derive(Debug)]
218pub struct VecPool {
219 buffers: Mutex<Vec<Vec<u8>>>,
221 capacity: usize,
223 self_ref: Mutex<Weak<Self>>,
225}
226
227impl VecPool {
228 #[must_use]
234 pub fn new(capacity: usize) -> Arc<Self> {
235 let pool = Arc::new(Self {
236 buffers: Mutex::new(Vec::with_capacity(capacity)),
237 capacity,
238 self_ref: Mutex::new(Weak::new()),
239 });
240 if let Ok(mut r) = pool.self_ref.lock() {
241 *r = Arc::downgrade(&pool);
242 }
243 pool
244 }
245
246 #[must_use]
248 pub fn capacity(&self) -> usize {
249 self.capacity
250 }
251
252 #[must_use]
263 pub fn available(&self) -> usize {
264 self.buffers.lock().map_or(0, |b| b.len())
265 }
266}
267
268impl FramePool for VecPool {
269 fn acquire(&self, size: usize) -> Option<PooledBuffer> {
270 if let Ok(mut buffers) = self.buffers.lock() {
271 let suitable_idx = buffers
272 .iter()
273 .enumerate()
274 .filter(|(_, b)| b.capacity() >= size)
275 .min_by_key(|(_, b)| b.capacity())
276 .map(|(idx, _)| idx);
277
278 if let Some(idx) = suitable_idx {
279 let mut buf = buffers.swap_remove(idx);
280 buf.resize(size, 0);
281 buf.fill(0);
282
283 let weak_ref = self
284 .self_ref
285 .lock()
286 .ok()
287 .and_then(|r| r.upgrade())
288 .map(|arc| Arc::downgrade(&(arc as Arc<dyn FramePool>)))?;
289
290 return Some(PooledBuffer::new(buf, weak_ref));
291 }
292 }
293 None
294 }
295
296 fn release(&self, buffer: Vec<u8>) {
297 if let Ok(mut buffers) = self.buffers.lock()
298 && buffers.len() < self.capacity
299 {
300 buffers.push(buffer);
301 }
302 }
303}
304
305pub type SimpleFramePool = VecPool;
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use std::sync::Arc;
314
315 #[test]
316 fn test_pooled_buffer_standalone() {
317 let data = vec![1u8, 2, 3, 4, 5];
318 let buffer = PooledBuffer::standalone(data.clone());
319
320 assert_eq!(buffer.len(), 5);
321 assert!(!buffer.is_empty());
322 assert_eq!(buffer.data(), &[1, 2, 3, 4, 5]);
323 }
324
325 #[test]
326 fn test_pooled_buffer_data_mut() {
327 let mut buffer = PooledBuffer::standalone(vec![0u8; 4]);
328 buffer.data_mut()[0] = 42;
329 assert_eq!(buffer.data()[0], 42);
330 }
331
332 #[test]
333 fn test_pooled_buffer_into_inner() {
334 let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
335 let inner = buffer.into_inner();
336 assert_eq!(inner, vec![1, 2, 3]);
337 }
338
339 #[test]
340 fn test_pooled_buffer_as_ref() {
341 let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
342 let slice: &[u8] = buffer.as_ref();
343 assert_eq!(slice, &[1, 2, 3]);
344 }
345
346 #[test]
347 fn test_pooled_buffer_as_mut() {
348 let mut buffer = PooledBuffer::standalone(vec![1, 2, 3]);
349 let slice: &mut [u8] = buffer.as_mut();
350 slice[0] = 99;
351 assert_eq!(buffer.data(), &[99, 2, 3]);
352 }
353
354 #[test]
355 fn test_empty_buffer() {
356 let buffer = PooledBuffer::standalone(vec![]);
357 assert!(buffer.is_empty());
358 assert_eq!(buffer.len(), 0);
359 }
360
361 #[test]
362 fn test_pool_with_arc_release() {
363 use std::sync::Mutex;
364 use std::sync::atomic::{AtomicUsize, Ordering};
365
366 #[derive(Debug)]
367 struct ArcPool {
368 buffers: Mutex<Vec<Vec<u8>>>,
369 release_count: AtomicUsize,
370 }
371
372 impl FramePool for ArcPool {
373 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
374 None }
376
377 fn release(&self, buffer: Vec<u8>) {
378 if let Ok(mut buffers) = self.buffers.lock() {
379 buffers.push(buffer);
380 self.release_count.fetch_add(1, Ordering::SeqCst);
381 }
382 }
383 }
384
385 let pool = Arc::new(ArcPool {
386 buffers: Mutex::new(vec![]),
387 release_count: AtomicUsize::new(0),
388 });
389
390 {
392 let _buffer =
393 PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
394 }
396
397 assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
399 assert!(pool.buffers.lock().map(|b| b.len() == 1).unwrap_or(false));
400 }
401
402 #[test]
403 fn test_pool_dropped_before_buffer() {
404 #[derive(Debug)]
405 struct DroppablePool;
406
407 impl FramePool for DroppablePool {
408 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
409 None
410 }
411
412 fn release(&self, _buffer: Vec<u8>) {
413 panic!("release should not be called on dropped pool");
415 }
416 }
417
418 let buffer;
419 {
420 let pool = Arc::new(DroppablePool);
421 buffer = PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
422 }
424
425 assert_eq!(buffer.data(), &[1, 2, 3]);
427
428 drop(buffer);
430 }
431
432 #[test]
433 fn test_pooled_buffer_clone_becomes_standalone() {
434 use std::sync::atomic::{AtomicUsize, Ordering};
435
436 #[derive(Debug)]
437 struct CountingPool {
438 release_count: AtomicUsize,
439 }
440
441 impl FramePool for CountingPool {
442 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
443 None
444 }
445
446 fn release(&self, _buffer: Vec<u8>) {
447 self.release_count.fetch_add(1, Ordering::SeqCst);
448 }
449 }
450
451 let pool = Arc::new(CountingPool {
452 release_count: AtomicUsize::new(0),
453 });
454
455 let buffer1 =
457 PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
458
459 let buffer2 = buffer1.clone();
461
462 assert_eq!(buffer1.data(), &[1, 2, 3]);
464 assert_eq!(buffer2.data(), &[1, 2, 3]);
465
466 drop(buffer1);
468 drop(buffer2);
469
470 assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
473 }
474
475 #[test]
476 fn test_pooled_buffer_clone_data_independence() {
477 let buffer1 = PooledBuffer::standalone(vec![1, 2, 3]);
478 let mut buffer2 = buffer1.clone();
479
480 buffer2.data_mut()[0] = 99;
482
483 assert_eq!(buffer1.data(), &[1, 2, 3]);
485 assert_eq!(buffer2.data(), &[99, 2, 3]);
486 }
487
488 #[test]
491 fn vec_pool_should_start_empty() {
492 let pool = VecPool::new(32);
493 assert_eq!(pool.capacity(), 32);
494 assert_eq!(pool.available(), 0);
495 }
496
497 #[test]
498 fn vec_pool_acquire_should_return_none_when_empty() {
499 let pool = VecPool::new(8);
500 assert!(pool.acquire(1024).is_none());
501 }
502
503 #[test]
504 fn vec_pool_release_then_acquire_should_reuse_buffer() {
505 let pool = VecPool::new(8);
506
507 pool.release(vec![0u8; 1024]);
508 assert_eq!(pool.available(), 1);
509
510 let buf = pool.acquire(512).unwrap();
511 assert_eq!(buf.len(), 512);
512 assert_eq!(pool.available(), 0);
513 }
514
515 #[test]
516 fn vec_pool_buffer_should_auto_return_on_drop() {
517 let pool = VecPool::new(8);
518 pool.release(vec![0u8; 2048]);
519
520 {
521 let _buf = pool.acquire(1024).unwrap();
522 assert_eq!(pool.available(), 0);
523 }
524
525 assert_eq!(pool.available(), 1);
526 }
527
528 #[test]
529 fn vec_pool_should_not_exceed_capacity() {
530 let pool = VecPool::new(2);
531
532 pool.release(vec![0u8; 512]);
533 pool.release(vec![0u8; 512]);
534 pool.release(vec![0u8; 512]); assert_eq!(pool.available(), 2);
537 }
538
539 #[test]
540 fn vec_pool_acquire_should_choose_smallest_fitting_buffer() {
541 let pool = VecPool::new(8);
542 pool.release(vec![0u8; 512]);
543 pool.release(vec![0u8; 1024]);
544 pool.release(vec![0u8; 2048]);
545
546 let buf = pool.acquire(1000).unwrap();
548 assert!(buf.len() >= 1000);
549 assert_eq!(pool.available(), 2);
550 }
551
552 #[test]
553 fn vec_pool_acquire_should_return_none_when_no_suitable_buffer() {
554 let pool = VecPool::new(4);
555 pool.release(vec![0u8; 512]);
556
557 assert!(pool.acquire(1024).is_none());
558 assert_eq!(pool.available(), 1);
560 }
561
562 #[test]
563 fn simple_frame_pool_alias_should_behave_identically_to_vec_pool() {
564 let pool = SimpleFramePool::new(4);
565 assert_eq!(pool.capacity(), 4);
566 assert_eq!(pool.available(), 0);
567 }
568}