1#[cfg(feature = "clone-behavior")]
2use clone_behavior::{MirroredClone, Speed};
3
4use crate::{
5 bounded::BoundedPool,
6 shared_bounded::SharedBoundedPool,
7 shared_unbounded::SharedUnboundedPool,
8 unbounded::UnboundedPool,
9};
10use crate::{
11 other_utils::{OutOfBuffers, ResetBuffer, ResourcePoolEmpty},
12 pooled_resource::{PooledBuffer, SealedBufferPool},
13};
14
15
16#[derive(Debug, Clone)]
18pub struct BoundedBufferPool(BoundedPool<Vec<u8>, ResetBuffer>);
19
20impl SealedBufferPool for BoundedBufferPool {
21 type InnerPool = BoundedPool<Vec<u8>, ResetBuffer>;
22}
23
24impl BoundedBufferPool {
25 #[inline]
30 #[must_use]
31 pub fn new(pool_size: usize, max_buffer_capacity: usize) -> Self {
32 Self(BoundedPool::new_default(pool_size, ResetBuffer::new(max_buffer_capacity)))
33 }
34
35 pub fn try_get(&self) -> Result<PooledBuffer<Self>, OutOfBuffers> {
37 self.0.try_get()
38 .map(PooledBuffer::new)
39 .map_err(|ResourcePoolEmpty| OutOfBuffers)
40 }
41
42 #[must_use]
48 pub fn get(&self) -> PooledBuffer<Self> {
49 PooledBuffer::new(self.0.get())
50 }
51
52 #[inline]
54 #[must_use]
55 pub fn pool_size(&self) -> usize {
56 self.0.pool_size()
57 }
58
59 #[must_use]
61 pub fn available_buffers(&self) -> usize {
62 self.0.available_resources()
63 }
64}
65
66#[cfg(feature = "clone-behavior")]
67impl<S: Speed> MirroredClone<S> for BoundedBufferPool {
68 #[inline]
69 fn mirrored_clone(&self) -> Self {
70 Self(MirroredClone::<S>::mirrored_clone(&self.0))
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct SharedBoundedBufferPool(SharedBoundedPool<Vec<u8>, ResetBuffer>);
77
78impl SealedBufferPool for SharedBoundedBufferPool {
79 type InnerPool = SharedBoundedPool<Vec<u8>, ResetBuffer>;
80}
81
82impl SharedBoundedBufferPool {
83 #[inline]
88 #[must_use]
89 pub fn new(pool_size: usize, max_buffer_capacity: usize) -> Self {
90 Self(SharedBoundedPool::new_default(pool_size, ResetBuffer::new(max_buffer_capacity)))
91 }
92
93 pub fn try_get(&self) -> Result<PooledBuffer<Self>, OutOfBuffers> {
95 self.0.try_get()
96 .map(PooledBuffer::new)
97 .map_err(|ResourcePoolEmpty| OutOfBuffers)
98 }
99
100 #[must_use]
109 pub fn get(&self) -> PooledBuffer<Self> {
110 PooledBuffer::new(self.0.get())
111 }
112
113 #[inline]
115 #[must_use]
116 pub fn pool_size(&self) -> usize {
117 self.0.pool_size()
118 }
119
120 #[must_use]
122 pub fn available_buffers(&self) -> usize {
123 self.0.available_resources()
124 }
125}
126
127#[cfg(feature = "clone-behavior")]
128impl<S: Speed> MirroredClone<S> for SharedBoundedBufferPool {
129 #[inline]
130 fn mirrored_clone(&self) -> Self {
131 Self(MirroredClone::<S>::mirrored_clone(&self.0))
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct UnboundedBufferPool(UnboundedPool<Vec<u8>, ResetBuffer>);
138
139impl SealedBufferPool for UnboundedBufferPool {
140 type InnerPool = UnboundedPool<Vec<u8>, ResetBuffer>;
141}
142
143impl UnboundedBufferPool {
144 #[inline]
149 #[must_use]
150 pub fn new(max_buffer_capacity: usize) -> Self {
151 Self(UnboundedPool::new(ResetBuffer::new(max_buffer_capacity)))
152 }
153
154 #[must_use]
156 pub fn get(&self) -> PooledBuffer<Self> {
157 PooledBuffer::new(self.0.get_default())
158 }
159
160 #[inline]
162 #[must_use]
163 pub fn pool_size(&self) -> usize {
164 self.0.pool_size()
165 }
166
167 #[must_use]
169 pub fn available_buffers(&self) -> usize {
170 self.0.available_resources()
171 }
172}
173
174#[cfg(feature = "clone-behavior")]
175impl<S: Speed> MirroredClone<S> for UnboundedBufferPool {
176 #[inline]
177 fn mirrored_clone(&self) -> Self {
178 Self(MirroredClone::<S>::mirrored_clone(&self.0))
179 }
180}
181
182#[derive(Debug, Clone)]
184pub struct SharedUnboundedBufferPool(SharedUnboundedPool<Vec<u8>, ResetBuffer>);
185
186impl SealedBufferPool for SharedUnboundedBufferPool {
187 type InnerPool = SharedUnboundedPool<Vec<u8>, ResetBuffer>;
188}
189
190impl SharedUnboundedBufferPool {
191 #[inline]
196 #[must_use]
197 pub fn new(max_buffer_capacity: usize) -> Self {
198 Self(SharedUnboundedPool::new(ResetBuffer::new(max_buffer_capacity)))
199 }
200
201 #[must_use]
203 pub fn get(&self) -> PooledBuffer<Self> {
204 PooledBuffer::new(self.0.get_default())
205 }
206
207 #[inline]
209 #[must_use]
210 pub fn pool_size(&self) -> usize {
211 self.0.pool_size()
212 }
213
214 #[must_use]
216 pub fn available_buffers(&self) -> usize {
217 self.0.available_resources()
218 }
219}
220
221#[cfg(feature = "clone-behavior")]
222impl<S: Speed> MirroredClone<S> for SharedUnboundedBufferPool {
223 #[inline]
224 fn mirrored_clone(&self) -> Self {
225 Self(MirroredClone::<S>::mirrored_clone(&self.0))
226 }
227}
228
229
230#[cfg(all(test, not(tests_with_leaks)))]
231mod bounded_tests {
232 use std::{array, iter};
233 use super::*;
234
235
236 #[test]
237 fn zero_capacity() {
238 let pool = BoundedBufferPool::new(0, 0);
239 assert_eq!(pool.pool_size(), 0);
240 assert_eq!(pool.available_buffers(), 0);
241 assert!(pool.try_get().is_err());
242 }
243
244 #[test]
245 #[should_panic]
246 fn zero_capacity_fail() {
247 let pool = BoundedBufferPool::new(0, 0);
248 let unreachable = pool.get();
249 let _: &Vec<u8> = &*unreachable;
250 }
251
252 #[test]
253 fn one_capacity() {
254 let pool = BoundedBufferPool::new(1, 1);
255 let buffer = pool.get();
256 assert_eq!(pool.available_buffers(), 0);
257 assert_eq!(buffer.len(), 0);
258 assert_eq!(buffer.capacity(), 0);
259 drop(buffer);
260 assert_eq!(pool.available_buffers(), 1);
261 let mut buffer = pool.get();
262 assert_eq!(buffer.len(), 0);
263 assert_eq!(buffer.capacity(), 0);
264 buffer.reserve(2);
265 assert_eq!(pool.available_buffers(), 0);
266 drop(buffer);
267 assert_eq!(pool.available_buffers(), 1);
268 let mut buffer = pool.get();
270 assert_eq!(buffer.len(), 0);
271 assert_eq!(buffer.capacity(), 0);
272
273 buffer.reserve_exact(1);
274 buffer.push(1);
275 if buffer.capacity() == 1 {
277 drop(buffer);
278 let buffer = pool.get();
280 assert_eq!(buffer.len(), 0);
281 assert_eq!(buffer.capacity(), 1);
282 }
283 }
284
285 #[test]
286 #[should_panic]
287 fn one_capacity_fail() {
288 let pool = BoundedBufferPool::new(1, 1);
289 let _buf = pool.get();
290 assert_eq!(pool.pool_size(), 1);
291 assert_eq!(pool.available_buffers(), 0);
292 let _unreachable = pool.get();
293 }
294
295 #[test]
296 fn init_and_reset() {
297 const POOL_CAPACITY: usize = 10;
298 const BUF_CAPACITY: usize = 4096;
299
300 let pool = BoundedBufferPool::new(POOL_CAPACITY, BUF_CAPACITY);
301 let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
302 for (idx, mut buffer) in buffers.into_iter().enumerate() {
303 buffer.reserve_exact(idx);
304 buffer.extend(iter::repeat_n(0, idx));
305 assert_eq!(buffer.len(), idx);
306 if buffer.capacity() > BUF_CAPACITY {
309 return;
310 }
311 }
312
313 let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
316 for (idx, buffer) in buffers.into_iter().enumerate() {
317 assert_eq!(buffer.len(), 0);
318 assert!(buffer.capacity() >= idx);
320 }
321 }
322}
323
324#[cfg(all(test, not(tests_with_leaks)))]
325mod shared_bounded_tests {
326 use std::{array, iter, sync::mpsc, thread};
327 use super::*;
328
329
330 #[test]
331 fn zero_capacity() {
332 let pool = SharedBoundedBufferPool::new(0, 0);
333 assert_eq!(pool.pool_size(), 0);
334 assert_eq!(pool.available_buffers(), 0);
335 assert!(pool.try_get().is_err());
336 }
337
338 #[test]
339 fn one_capacity() {
340 let pool = SharedBoundedBufferPool::new(1, 1);
341 let buffer = pool.get();
342 assert_eq!(pool.available_buffers(), 0);
343 assert_eq!(buffer.len(), 0);
344 assert_eq!(buffer.capacity(), 0);
345 drop(buffer);
346 assert_eq!(pool.available_buffers(), 1);
347 let mut buffer = pool.get();
348 assert_eq!(buffer.len(), 0);
349 assert_eq!(buffer.capacity(), 0);
350 buffer.reserve(2);
351 assert_eq!(pool.available_buffers(), 0);
352 drop(buffer);
353 assert_eq!(pool.available_buffers(), 1);
354 let mut buffer = pool.get();
356 assert_eq!(buffer.len(), 0);
357 assert_eq!(buffer.capacity(), 0);
358
359 buffer.reserve_exact(1);
360 buffer.push(1);
361 if buffer.capacity() == 1 {
363 drop(buffer);
364 let buffer = pool.get();
366 assert_eq!(buffer.len(), 0);
367 assert_eq!(buffer.capacity(), 1);
368 }
369 }
370
371 #[test]
372 fn init_and_reset() {
373 const POOL_CAPACITY: usize = 10;
374 const BUF_CAPACITY: usize = 4096;
375
376 let pool = SharedBoundedBufferPool::new(POOL_CAPACITY, BUF_CAPACITY);
377 let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
378 for (idx, mut buffer) in buffers.into_iter().enumerate() {
379 buffer.reserve_exact(idx);
380 buffer.extend(iter::repeat_n(0, idx));
381 assert_eq!(buffer.len(), idx);
382 if buffer.capacity() > BUF_CAPACITY {
385 return;
386 }
387 }
388
389 let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
391
392 let mut seen_values = buffers.into_iter()
393 .map(|buffer| {
394 assert_eq!(buffer.len(), 0);
395 buffer.capacity()
396 })
397 .collect::<Vec<_>>();
398
399 seen_values.sort_unstable();
400
401 for (idx, capacity) in seen_values.into_iter().enumerate() {
402 assert!(capacity >= idx);
404 }
405 }
406
407 #[test]
408 fn multithreaded_one_capacity() {
409 const BUF_CAPACITY: usize = 4096;
410
411 let pool = SharedBoundedBufferPool::new(1, BUF_CAPACITY);
412
413 let cloned_pool = pool.clone();
414
415 assert_eq!(pool.available_buffers(), 1);
416
417 let (signal_main, wait_for_thread) = mpsc::channel();
418 let (signal_thread, wait_for_main) = mpsc::channel();
419
420 thread::spawn(move || {
421 let mut buffer = cloned_pool.get();
422 signal_main.send(()).unwrap();
423 wait_for_main.recv().unwrap();
424 assert_eq!(buffer.len(), 0);
426 if buffer.capacity() > BUF_CAPACITY {
427 return;
432 }
433 buffer.push(42);
434 drop(buffer);
435 signal_main.send(()).unwrap();
436 });
437
438 wait_for_thread.recv().unwrap();
439 assert_eq!(pool.available_buffers(), 0);
440 signal_thread.send(()).unwrap();
441 wait_for_thread.recv().unwrap();
442 assert_eq!(pool.available_buffers(), 1);
443 let buffer = pool.get();
444 assert_eq!(buffer.len(), 0);
445 assert!(buffer.capacity() > 0);
446 }
447}
448
449
450#[cfg(all(test, not(tests_with_leaks)))]
451mod unbounded_tests {
452 use std::{array, iter};
453 use super::*;
454
455
456 #[test]
457 fn zero_or_one_size() {
458 let pool = UnboundedBufferPool::new(0);
459 assert_eq!(pool.pool_size(), 0);
460 assert_eq!(pool.available_buffers(), 0);
461
462 let buffer = pool.get();
463 let _: &Vec<u8> = &buffer;
464 assert_eq!(pool.pool_size(), 1);
465 assert_eq!(pool.available_buffers(), 0);
466
467 drop(buffer);
468 assert_eq!(pool.pool_size(), 1);
469 assert_eq!(pool.available_buffers(), 1);
470 }
471
472 #[test]
473 fn one_capacity() {
474 let pool = UnboundedBufferPool::new(1);
475 let buffer = pool.get();
476 assert_eq!(pool.available_buffers(), 0);
477 assert_eq!(buffer.len(), 0);
478 assert_eq!(buffer.capacity(), 0);
479 drop(buffer);
480 assert_eq!(pool.available_buffers(), 1);
481 let mut buffer = pool.get();
482 assert_eq!(buffer.len(), 0);
483 assert_eq!(buffer.capacity(), 0);
484 buffer.reserve(2);
485 assert_eq!(pool.available_buffers(), 0);
486 drop(buffer);
487 assert_eq!(pool.available_buffers(), 1);
488 let mut buffer = pool.get();
490 assert_eq!(buffer.len(), 0);
491 assert_eq!(buffer.capacity(), 0);
492
493 buffer.reserve_exact(1);
494 buffer.push(1);
495 if buffer.capacity() == 1 {
497 drop(buffer);
498 let buffer = pool.get();
500 assert_eq!(buffer.len(), 0);
501 assert_eq!(buffer.capacity(), 1);
502 }
503 }
504
505 #[test]
506 fn init_and_reset() {
507 const POOL_SIZE: usize = 10;
508 const BUF_CAPACITY: usize = 4096;
509
510 let pool = UnboundedBufferPool::new(BUF_CAPACITY);
511 let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
512 for (idx, mut buffer) in buffers.into_iter().enumerate() {
513 buffer.reserve_exact(idx);
514 buffer.extend(iter::repeat_n(0, idx));
515 assert_eq!(buffer.len(), idx);
516 if buffer.capacity() > BUF_CAPACITY {
519 return;
520 }
521 }
522
523 let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
526 assert_eq!(pool.get().capacity(), 0);
528
529 for (idx, buffer) in buffers.into_iter().rev().enumerate() {
530 assert_eq!(buffer.len(), 0);
531 assert!(buffer.capacity() >= idx);
533 }
534 }
535}
536
537#[cfg(all(test, not(tests_with_leaks)))]
538mod shared_unbounded_tests {
539 use std::{array, iter, sync::mpsc, thread};
540 use super::*;
541
542
543 #[test]
544 fn zero_or_one_size() {
545 let pool: SharedUnboundedBufferPool = SharedUnboundedBufferPool::new(0);
546 assert_eq!(pool.pool_size(), 0);
547 assert_eq!(pool.available_buffers(), 0);
548
549 let buffer = pool.get();
550 let _: &Vec<u8> = &buffer;
551 assert_eq!(pool.pool_size(), 1);
552 assert_eq!(pool.available_buffers(), 0);
553
554 drop(buffer);
555 assert_eq!(pool.pool_size(), 1);
556 assert_eq!(pool.available_buffers(), 1);
557 }
558
559 #[test]
560 fn one_capacity() {
561 let pool = SharedUnboundedBufferPool::new(1);
562 let buffer = pool.get();
563 assert_eq!(pool.available_buffers(), 0);
564 assert_eq!(buffer.len(), 0);
565 assert_eq!(buffer.capacity(), 0);
566 drop(buffer);
567 assert_eq!(pool.available_buffers(), 1);
568 let mut buffer = pool.get();
569 assert_eq!(buffer.len(), 0);
570 assert_eq!(buffer.capacity(), 0);
571 buffer.reserve(2);
572 assert_eq!(pool.available_buffers(), 0);
573 drop(buffer);
574 assert_eq!(pool.available_buffers(), 1);
575 let mut buffer = pool.get();
577 assert_eq!(buffer.len(), 0);
578 assert_eq!(buffer.capacity(), 0);
579
580 buffer.reserve_exact(1);
581 buffer.push(1);
582 if buffer.capacity() == 1 {
584 drop(buffer);
585 let buffer = pool.get();
587 assert_eq!(buffer.len(), 0);
588 assert_eq!(buffer.capacity(), 1);
589 }
590 }
591
592 #[test]
593 fn init_and_reset() {
594 const POOL_SIZE: usize = 10;
595 const BUF_CAPACITY: usize = 4096;
596
597 let pool = SharedUnboundedBufferPool::new(BUF_CAPACITY);
598 let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
599 for (idx, mut buffer) in buffers.into_iter().enumerate() {
600 buffer.reserve_exact(idx);
601 buffer.extend(iter::repeat_n(0, idx));
602 assert_eq!(buffer.len(), idx);
603 if buffer.capacity() > BUF_CAPACITY {
606 return;
607 }
608 }
609
610 let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
612 assert_eq!(pool.get().capacity(), 0);
614
615 let mut seen_values = buffers.into_iter()
616 .map(|buffer| {
617 assert_eq!(buffer.len(), 0);
618 buffer.capacity()
619 })
620 .collect::<Vec<_>>();
621
622 seen_values.sort_unstable();
623
624 for (idx, capacity) in seen_values.into_iter().enumerate() {
625 assert!(capacity >= idx);
627 }
628 }
629
630 #[test]
631 fn multithreaded_one_capacity() {
632 const BUF_CAPACITY: usize = 4096;
633
634 let pool = SharedUnboundedBufferPool::new(BUF_CAPACITY);
635
636 let cloned_pool = pool.clone();
637
638 assert_eq!(pool.pool_size(), 0);
639 assert_eq!(pool.available_buffers(), 0);
640
641 let (signal_main, wait_for_thread) = mpsc::channel();
642 let (signal_thread, wait_for_main) = mpsc::channel();
643
644 thread::spawn(move || {
645 let mut buffer = cloned_pool.get();
646 signal_main.send(()).unwrap();
647 wait_for_main.recv().unwrap();
648 assert_eq!(buffer.len(), 0);
650 if buffer.capacity() > BUF_CAPACITY {
651 return;
656 }
657 buffer.push(42);
658 drop(buffer);
659 signal_main.send(()).unwrap();
660 });
661
662 wait_for_thread.recv().unwrap();
663 assert_eq!(pool.pool_size(), 1);
664 assert_eq!(pool.available_buffers(), 0);
665
666 signal_thread.send(()).unwrap();
667 wait_for_thread.recv().unwrap();
668 assert_eq!(pool.pool_size(), 1);
669 assert_eq!(pool.available_buffers(), 1);
670
671 let buffer = pool.get();
672 assert_eq!(pool.pool_size(), 1);
673 assert_eq!(pool.available_buffers(), 0);
674
675 assert_eq!(buffer.len(), 0);
676 assert!(buffer.capacity() > 0);
677 }
678}