anchored_pool/
buffer_pools.rs

1#[cfg(feature = "clone-behavior")]
2use clone_behavior::{MirroredClone, Speed};
3
4use crate::{
5    bounded::BoundedPool,
6    shared_bounded::SharedBoundedPool,
7    shared_unbounded::SharedUnboundedPool,
8    unbounded::UnboundedPool,
9};
10use crate::{
11    other_utils::{OutOfBuffers, ResetBuffer, ResourcePoolEmpty},
12    pooled_resource::{PooledBuffer, SealedBufferPool},
13};
14
15
16/// A pool with a fixed number of `Vec<u8>` buffers.
17#[derive(Debug, Clone)]
18pub struct BoundedBufferPool(BoundedPool<Vec<u8>, ResetBuffer>);
19
20impl SealedBufferPool for BoundedBufferPool {
21    type InnerPool = BoundedPool<Vec<u8>, ResetBuffer>;
22}
23
24impl BoundedBufferPool {
25    /// Create a new `BoundedBufferPool` which has the indicated, fixed number of `Vec<u8>` buffers.
26    ///
27    /// Whenever a buffer is returned to the pool, if its capacity is at most `max_buffer_capacity`,
28    /// then [`Vec::clear`] is run on it; otherwise, it is replaced with a new empty `Vec<u8>`.
29    #[inline]
30    #[must_use]
31    pub fn new(pool_size: usize, max_buffer_capacity: usize) -> Self {
32        Self(BoundedPool::new_default(pool_size, ResetBuffer::new(max_buffer_capacity)))
33    }
34
35    /// Get a buffer from the pool, if any are available.
36    pub fn try_get(&self) -> Result<PooledBuffer<Self>, OutOfBuffers> {
37        self.0.try_get()
38            .map(PooledBuffer::new)
39            .map_err(|ResourcePoolEmpty| OutOfBuffers)
40    }
41
42    /// Get a buffer from the pool.
43    ///
44    /// # Panics
45    /// Panics if no buffers are currently available. As `BoundedBufferPool` is `!Send + !Sync`, no
46    /// buffer could ever become available while in the body of this function.
47    #[must_use]
48    pub fn get(&self) -> PooledBuffer<Self> {
49        PooledBuffer::new(self.0.get())
50    }
51
52    /// Get the total number of buffers in this pool, whether available or in-use.
53    #[inline]
54    #[must_use]
55    pub fn pool_size(&self) -> usize {
56       self.0.pool_size()
57    }
58
59    /// Get the number of buffers in the pool which are not currently being used.
60    #[must_use]
61    pub fn available_buffers(&self) -> usize {
62        self.0.available_resources()
63    }
64}
65
66#[cfg(feature = "clone-behavior")]
67impl<S: Speed> MirroredClone<S> for BoundedBufferPool {
68    #[inline]
69    fn mirrored_clone(&self) -> Self {
70        Self(MirroredClone::<S>::mirrored_clone(&self.0))
71    }
72}
73
74/// A threadsafe pool with a fixed number of `Vec<u8>` buffers.
75#[derive(Debug, Clone)]
76pub struct SharedBoundedBufferPool(SharedBoundedPool<Vec<u8>, ResetBuffer>);
77
78impl SealedBufferPool for SharedBoundedBufferPool {
79    type InnerPool = SharedBoundedPool<Vec<u8>, ResetBuffer>;
80}
81
82impl SharedBoundedBufferPool {
83    /// Create a new `SharedBoundedPool` which has the indicated, fixed number of `Vec<u8>` buffers.
84    ///
85    /// Whenever a buffer is returned to the pool, if its capacity is at most `max_buffer_capacity`,
86    /// then [`Vec::clear`] is run on it; otherwise, it is replaced with a new empty `Vec<u8>`.
87    #[inline]
88    #[must_use]
89    pub fn new(pool_size: usize, max_buffer_capacity: usize) -> Self {
90        Self(SharedBoundedPool::new_default(pool_size, ResetBuffer::new(max_buffer_capacity)))
91    }
92
93    /// Get a buffer from the pool, if any are available.
94    pub fn try_get(&self) -> Result<PooledBuffer<Self>, OutOfBuffers> {
95        self.0.try_get()
96            .map(PooledBuffer::new)
97            .map_err(|ResourcePoolEmpty| OutOfBuffers)
98    }
99
100    /// Get a buffer from the pool.
101    ///
102    /// May need to wait for a buffer to become available.
103    ///
104    /// # Potential Panics or Deadlocks
105    /// If `self.pool_size() == 0`, then this method will panic or deadlock.
106    /// This method may also cause a deadlock if no buffers are currently available, and the
107    /// current thread needs to make progress in order to release a buffer.
108    #[must_use]
109    pub fn get(&self) -> PooledBuffer<Self> {
110        PooledBuffer::new(self.0.get())
111    }
112
113    /// Get the total number of buffers in this pool, whether available or in-use.
114    #[inline]
115    #[must_use]
116    pub fn pool_size(&self) -> usize {
117       self.0.pool_size()
118    }
119
120    /// Get the number of buffers in the pool which are not currently being used.
121    #[must_use]
122    pub fn available_buffers(&self) -> usize {
123        self.0.available_resources()
124    }
125}
126
127#[cfg(feature = "clone-behavior")]
128impl<S: Speed> MirroredClone<S> for SharedBoundedBufferPool {
129    #[inline]
130    fn mirrored_clone(&self) -> Self {
131        Self(MirroredClone::<S>::mirrored_clone(&self.0))
132    }
133}
134
135/// A pool with a growable number of `Vec<u8>` buffers.
136#[derive(Debug, Clone)]
137pub struct UnboundedBufferPool(UnboundedPool<Vec<u8>, ResetBuffer>);
138
139impl SealedBufferPool for UnboundedBufferPool {
140    type InnerPool = UnboundedPool<Vec<u8>, ResetBuffer>;
141}
142
143impl UnboundedBufferPool {
144    /// Create a new `UnboundedBufferPool`, which initially has zero `Vec<u8>` buffers.
145    ///
146    /// Whenever a buffer is returned to the pool, if its capacity is at most `max_buffer_capacity`,
147    /// then [`Vec::clear`] is run on it; otherwise, it is replaced with a new empty `Vec<u8>`.
148    #[inline]
149    #[must_use]
150    pub fn new(max_buffer_capacity: usize) -> Self {
151        Self(UnboundedPool::new(ResetBuffer::new(max_buffer_capacity)))
152    }
153
154    /// Get a buffer from the pool, returning a new empty buffer if none were available in the pool.
155    #[must_use]
156    pub fn get(&self) -> PooledBuffer<Self> {
157        PooledBuffer::new(self.0.get_default())
158    }
159
160    /// Get the total number of buffers in this pool, whether available or in-use.
161    #[inline]
162    #[must_use]
163    pub fn pool_size(&self) -> usize {
164       self.0.pool_size()
165    }
166
167    /// Get the number of buffers in the pool which are not currently being used.
168    #[must_use]
169    pub fn available_buffers(&self) -> usize {
170        self.0.available_resources()
171    }
172}
173
174#[cfg(feature = "clone-behavior")]
175impl<S: Speed> MirroredClone<S> for UnboundedBufferPool {
176    #[inline]
177    fn mirrored_clone(&self) -> Self {
178        Self(MirroredClone::<S>::mirrored_clone(&self.0))
179    }
180}
181
182/// A threadsafe pool with a growable number of `Vec<u8>` buffers.
183#[derive(Debug, Clone)]
184pub struct SharedUnboundedBufferPool(SharedUnboundedPool<Vec<u8>, ResetBuffer>);
185
186impl SealedBufferPool for SharedUnboundedBufferPool {
187    type InnerPool = SharedUnboundedPool<Vec<u8>, ResetBuffer>;
188}
189
190impl SharedUnboundedBufferPool {
191    /// Create a new `SharedUnboundedBufferPool`, which initially has zero `Vec<u8>` buffers.
192    ///
193    /// Whenever a buffer is returned to the pool, if its capacity is at most `max_buffer_capacity`,
194    /// then [`Vec::clear`] is run on it; otherwise, it is replaced with a new empty `Vec<u8>`.
195    #[inline]
196    #[must_use]
197    pub fn new(max_buffer_capacity: usize) -> Self {
198        Self(SharedUnboundedPool::new(ResetBuffer::new(max_buffer_capacity)))
199    }
200
201    /// Get a buffer from the pool, returning a new empty buffer if none were available in the pool.
202    #[must_use]
203    pub fn get(&self) -> PooledBuffer<Self> {
204        PooledBuffer::new(self.0.get_default())
205    }
206
207    /// Get the total number of buffers in this pool, whether available or in-use.
208    #[inline]
209    #[must_use]
210    pub fn pool_size(&self) -> usize {
211       self.0.pool_size()
212    }
213
214    /// Get the number of buffers in the pool which are not currently being used.
215    #[must_use]
216    pub fn available_buffers(&self) -> usize {
217        self.0.available_resources()
218    }
219}
220
221#[cfg(feature = "clone-behavior")]
222impl<S: Speed> MirroredClone<S> for SharedUnboundedBufferPool {
223    #[inline]
224    fn mirrored_clone(&self) -> Self {
225        Self(MirroredClone::<S>::mirrored_clone(&self.0))
226    }
227}
228
229
230#[cfg(all(test, not(tests_with_leaks)))]
231mod bounded_tests {
232    use std::{array, iter};
233    use super::*;
234
235
236    #[test]
237    fn zero_capacity() {
238        let pool = BoundedBufferPool::new(0, 0);
239        assert_eq!(pool.pool_size(), 0);
240        assert_eq!(pool.available_buffers(), 0);
241        assert!(pool.try_get().is_err());
242    }
243
244    #[test]
245    #[should_panic]
246    fn zero_capacity_fail() {
247        let pool = BoundedBufferPool::new(0, 0);
248        let unreachable = pool.get();
249        let _: &Vec<u8> = &*unreachable;
250    }
251
252    #[test]
253    fn one_capacity() {
254        let pool = BoundedBufferPool::new(1, 1);
255        let buffer = pool.get();
256        assert_eq!(pool.available_buffers(), 0);
257        assert_eq!(buffer.len(), 0);
258        assert_eq!(buffer.capacity(), 0);
259        drop(buffer);
260        assert_eq!(pool.available_buffers(), 1);
261        let mut buffer = pool.get();
262        assert_eq!(buffer.len(), 0);
263        assert_eq!(buffer.capacity(), 0);
264        buffer.reserve(2);
265        assert_eq!(pool.available_buffers(), 0);
266        drop(buffer);
267        assert_eq!(pool.available_buffers(), 1);
268        // It got reset, since 2 capacity exceeds 1
269        let mut buffer = pool.get();
270        assert_eq!(buffer.len(), 0);
271        assert_eq!(buffer.capacity(), 0);
272
273        buffer.reserve_exact(1);
274        buffer.push(1);
275        // `reserve_exact` could theoretically allocate extra elements
276        if buffer.capacity() == 1 {
277            drop(buffer);
278            // The buffer got cleared, but is still the same buffer
279            let buffer = pool.get();
280            assert_eq!(buffer.len(), 0);
281            assert_eq!(buffer.capacity(), 1);
282        }
283    }
284
285    #[test]
286    #[should_panic]
287    fn one_capacity_fail() {
288        let pool = BoundedBufferPool::new(1, 1);
289        let _buf = pool.get();
290        assert_eq!(pool.pool_size(), 1);
291        assert_eq!(pool.available_buffers(), 0);
292        let _unreachable = pool.get();
293    }
294
295    #[test]
296    fn init_and_reset() {
297        const POOL_CAPACITY: usize = 10;
298        const BUF_CAPACITY: usize = 4096;
299
300        let pool = BoundedBufferPool::new(POOL_CAPACITY, BUF_CAPACITY);
301        let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
302        for (idx, mut buffer) in buffers.into_iter().enumerate() {
303            buffer.reserve_exact(idx);
304            buffer.extend(iter::repeat_n(0, idx));
305            assert_eq!(buffer.len(), idx);
306            // If the allocator allocated more than 4096 bytes in response to a request for
307            // at most 10... just exit the test early.
308            if buffer.capacity() > BUF_CAPACITY {
309                return;
310            }
311        }
312
313        // Their lengths but not capacities have been reset.
314        // NOTE: users should not rely on the order.
315        let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
316        for (idx, buffer) in buffers.into_iter().enumerate() {
317            assert_eq!(buffer.len(), 0);
318            // Technically need not be equal.
319            assert!(buffer.capacity() >= idx);
320        }
321    }
322}
323
324#[cfg(all(test, not(tests_with_leaks)))]
325mod shared_bounded_tests {
326    use std::{array, iter, sync::mpsc, thread};
327    use super::*;
328
329
330    #[test]
331    fn zero_capacity() {
332        let pool = SharedBoundedBufferPool::new(0, 0);
333        assert_eq!(pool.pool_size(), 0);
334        assert_eq!(pool.available_buffers(), 0);
335        assert!(pool.try_get().is_err());
336    }
337
338    #[test]
339    fn one_capacity() {
340        let pool = SharedBoundedBufferPool::new(1, 1);
341        let buffer = pool.get();
342        assert_eq!(pool.available_buffers(), 0);
343        assert_eq!(buffer.len(), 0);
344        assert_eq!(buffer.capacity(), 0);
345        drop(buffer);
346        assert_eq!(pool.available_buffers(), 1);
347        let mut buffer = pool.get();
348        assert_eq!(buffer.len(), 0);
349        assert_eq!(buffer.capacity(), 0);
350        buffer.reserve(2);
351        assert_eq!(pool.available_buffers(), 0);
352        drop(buffer);
353        assert_eq!(pool.available_buffers(), 1);
354        // It got reset, since 2 capacity exceeds 1
355        let mut buffer = pool.get();
356        assert_eq!(buffer.len(), 0);
357        assert_eq!(buffer.capacity(), 0);
358
359        buffer.reserve_exact(1);
360        buffer.push(1);
361        // `reserve_exact` could theoretically allocate extra elements
362        if buffer.capacity() == 1 {
363            drop(buffer);
364            // The buffer got cleared, but is still the same buffer
365            let buffer = pool.get();
366            assert_eq!(buffer.len(), 0);
367            assert_eq!(buffer.capacity(), 1);
368        }
369    }
370
371    #[test]
372    fn init_and_reset() {
373        const POOL_CAPACITY: usize = 10;
374        const BUF_CAPACITY: usize = 4096;
375
376        let pool = SharedBoundedBufferPool::new(POOL_CAPACITY, BUF_CAPACITY);
377        let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
378        for (idx, mut buffer) in buffers.into_iter().enumerate() {
379            buffer.reserve_exact(idx);
380            buffer.extend(iter::repeat_n(0, idx));
381            assert_eq!(buffer.len(), idx);
382            // If the allocator allocated more than 4096 bytes in response to a request for
383            // at most 10... just exit the test early.
384            if buffer.capacity() > BUF_CAPACITY {
385                return;
386            }
387        }
388
389        // Their lengths but not capacities have been reset.
390        let buffers: [_; POOL_CAPACITY] = array::from_fn(|_| pool.get());
391
392        let mut seen_values = buffers.into_iter()
393            .map(|buffer| {
394                assert_eq!(buffer.len(), 0);
395                buffer.capacity()
396            })
397            .collect::<Vec<_>>();
398
399        seen_values.sort_unstable();
400
401        for (idx, capacity) in seen_values.into_iter().enumerate() {
402            // Technically need not be equal.
403            assert!(capacity >= idx);
404        }
405    }
406
407    #[test]
408    fn multithreaded_one_capacity() {
409        const BUF_CAPACITY: usize = 4096;
410
411        let pool = SharedBoundedBufferPool::new(1, BUF_CAPACITY);
412
413        let cloned_pool = pool.clone();
414
415        assert_eq!(pool.available_buffers(), 1);
416
417        let (signal_main, wait_for_thread) = mpsc::channel();
418        let (signal_thread, wait_for_main) = mpsc::channel();
419
420        thread::spawn(move || {
421            let mut buffer = cloned_pool.get();
422            signal_main.send(()).unwrap();
423            wait_for_main.recv().unwrap();
424            // This shouldn't allocate 4096 bytes, but technically could
425            assert_eq!(buffer.len(), 0);
426            if buffer.capacity() > BUF_CAPACITY {
427                // This drops `signal_main` and still causes a test failure, but in a noticeably
428                // different way.
429                // But again, this should not happen unless the global allocator is set to
430                // something truly strange.
431                return;
432            }
433            buffer.push(42);
434            drop(buffer);
435            signal_main.send(()).unwrap();
436        });
437
438        wait_for_thread.recv().unwrap();
439        assert_eq!(pool.available_buffers(), 0);
440        signal_thread.send(()).unwrap();
441        wait_for_thread.recv().unwrap();
442        assert_eq!(pool.available_buffers(), 1);
443        let buffer = pool.get();
444        assert_eq!(buffer.len(), 0);
445        assert!(buffer.capacity() > 0);
446    }
447}
448
449
450#[cfg(all(test, not(tests_with_leaks)))]
451mod unbounded_tests {
452    use std::{array, iter};
453    use super::*;
454
455
456    #[test]
457    fn zero_or_one_size() {
458        let pool = UnboundedBufferPool::new(0);
459        assert_eq!(pool.pool_size(), 0);
460        assert_eq!(pool.available_buffers(), 0);
461
462        let buffer = pool.get();
463        let _: &Vec<u8> = &buffer;
464        assert_eq!(pool.pool_size(), 1);
465        assert_eq!(pool.available_buffers(), 0);
466
467        drop(buffer);
468        assert_eq!(pool.pool_size(), 1);
469        assert_eq!(pool.available_buffers(), 1);
470    }
471
472    #[test]
473    fn one_capacity() {
474        let pool = UnboundedBufferPool::new(1);
475        let buffer = pool.get();
476        assert_eq!(pool.available_buffers(), 0);
477        assert_eq!(buffer.len(), 0);
478        assert_eq!(buffer.capacity(), 0);
479        drop(buffer);
480        assert_eq!(pool.available_buffers(), 1);
481        let mut buffer = pool.get();
482        assert_eq!(buffer.len(), 0);
483        assert_eq!(buffer.capacity(), 0);
484        buffer.reserve(2);
485        assert_eq!(pool.available_buffers(), 0);
486        drop(buffer);
487        assert_eq!(pool.available_buffers(), 1);
488        // It got reset, since 2 capacity exceeds 1
489        let mut buffer = pool.get();
490        assert_eq!(buffer.len(), 0);
491        assert_eq!(buffer.capacity(), 0);
492
493        buffer.reserve_exact(1);
494        buffer.push(1);
495        // `reserve_exact` could theoretically allocate extra elements
496        if buffer.capacity() == 1 {
497            drop(buffer);
498            // The buffer got cleared, but is still the same buffer
499            let buffer = pool.get();
500            assert_eq!(buffer.len(), 0);
501            assert_eq!(buffer.capacity(), 1);
502        }
503    }
504
505    #[test]
506    fn init_and_reset() {
507        const POOL_SIZE: usize = 10;
508        const BUF_CAPACITY: usize = 4096;
509
510        let pool = UnboundedBufferPool::new(BUF_CAPACITY);
511        let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
512        for (idx, mut buffer) in buffers.into_iter().enumerate() {
513            buffer.reserve_exact(idx);
514            buffer.extend(iter::repeat_n(0, idx));
515            assert_eq!(buffer.len(), idx);
516            // If the allocator allocated more than 4096 bytes in response to a request for
517            // at most 10... just exit the test early.
518            if buffer.capacity() > BUF_CAPACITY {
519                return;
520            }
521        }
522
523        // Their lengths but not capacities have been reset.
524        // NOTE: users should not rely on the order.
525        let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
526        // This one is new.
527        assert_eq!(pool.get().capacity(), 0);
528
529        for (idx, buffer) in buffers.into_iter().rev().enumerate() {
530            assert_eq!(buffer.len(), 0);
531            // Technically need not be equal.
532            assert!(buffer.capacity() >= idx);
533        }
534    }
535}
536
537#[cfg(all(test, not(tests_with_leaks)))]
538mod shared_unbounded_tests {
539    use std::{array, iter, sync::mpsc, thread};
540    use super::*;
541
542
543    #[test]
544    fn zero_or_one_size() {
545        let pool: SharedUnboundedBufferPool = SharedUnboundedBufferPool::new(0);
546        assert_eq!(pool.pool_size(), 0);
547        assert_eq!(pool.available_buffers(), 0);
548
549        let buffer = pool.get();
550        let _: &Vec<u8> = &buffer;
551        assert_eq!(pool.pool_size(), 1);
552        assert_eq!(pool.available_buffers(), 0);
553
554        drop(buffer);
555        assert_eq!(pool.pool_size(), 1);
556        assert_eq!(pool.available_buffers(), 1);
557    }
558
559    #[test]
560    fn one_capacity() {
561        let pool = SharedUnboundedBufferPool::new(1);
562        let buffer = pool.get();
563        assert_eq!(pool.available_buffers(), 0);
564        assert_eq!(buffer.len(), 0);
565        assert_eq!(buffer.capacity(), 0);
566        drop(buffer);
567        assert_eq!(pool.available_buffers(), 1);
568        let mut buffer = pool.get();
569        assert_eq!(buffer.len(), 0);
570        assert_eq!(buffer.capacity(), 0);
571        buffer.reserve(2);
572        assert_eq!(pool.available_buffers(), 0);
573        drop(buffer);
574        assert_eq!(pool.available_buffers(), 1);
575        // It got reset, since 2 capacity exceeds 1
576        let mut buffer = pool.get();
577        assert_eq!(buffer.len(), 0);
578        assert_eq!(buffer.capacity(), 0);
579
580        buffer.reserve_exact(1);
581        buffer.push(1);
582        // `reserve_exact` could theoretically allocate extra elements
583        if buffer.capacity() == 1 {
584            drop(buffer);
585            // The buffer got cleared, but is still the same buffer
586            let buffer = pool.get();
587            assert_eq!(buffer.len(), 0);
588            assert_eq!(buffer.capacity(), 1);
589        }
590    }
591
592    #[test]
593    fn init_and_reset() {
594        const POOL_SIZE: usize = 10;
595        const BUF_CAPACITY: usize = 4096;
596
597        let pool = SharedUnboundedBufferPool::new(BUF_CAPACITY);
598        let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
599        for (idx, mut buffer) in buffers.into_iter().enumerate() {
600            buffer.reserve_exact(idx);
601            buffer.extend(iter::repeat_n(0, idx));
602            assert_eq!(buffer.len(), idx);
603            // If the allocator allocated more than 4096 bytes in response to a request for
604            // at most 10... just exit the test early.
605            if buffer.capacity() > BUF_CAPACITY {
606                return;
607            }
608        }
609
610        // Their lengths but not capacities have been reset.
611        let buffers: [_; POOL_SIZE] = array::from_fn(|_| pool.get());
612        // This one is new.
613        assert_eq!(pool.get().capacity(), 0);
614
615        let mut seen_values = buffers.into_iter()
616            .map(|buffer| {
617                assert_eq!(buffer.len(), 0);
618                buffer.capacity()
619            })
620            .collect::<Vec<_>>();
621
622        seen_values.sort_unstable();
623
624        for (idx, capacity) in seen_values.into_iter().enumerate() {
625            // Technically need not be equal.
626            assert!(capacity >= idx);
627        }
628    }
629
630    #[test]
631    fn multithreaded_one_capacity() {
632        const BUF_CAPACITY: usize = 4096;
633
634        let pool = SharedUnboundedBufferPool::new(BUF_CAPACITY);
635
636        let cloned_pool = pool.clone();
637
638        assert_eq!(pool.pool_size(), 0);
639        assert_eq!(pool.available_buffers(), 0);
640
641        let (signal_main, wait_for_thread) = mpsc::channel();
642        let (signal_thread, wait_for_main) = mpsc::channel();
643
644        thread::spawn(move || {
645            let mut buffer = cloned_pool.get();
646            signal_main.send(()).unwrap();
647            wait_for_main.recv().unwrap();
648            // This shouldn't allocate 4096 bytes, but technically could
649            assert_eq!(buffer.len(), 0);
650            if buffer.capacity() > BUF_CAPACITY {
651                // This drops `signal_main` and still causes a test failure, but in a noticeably
652                // different way.
653                // But again, this should not happen unless the global allocator is set to
654                // something truly strange.
655                return;
656            }
657            buffer.push(42);
658            drop(buffer);
659            signal_main.send(()).unwrap();
660        });
661
662        wait_for_thread.recv().unwrap();
663        assert_eq!(pool.pool_size(), 1);
664        assert_eq!(pool.available_buffers(), 0);
665
666        signal_thread.send(()).unwrap();
667        wait_for_thread.recv().unwrap();
668        assert_eq!(pool.pool_size(), 1);
669        assert_eq!(pool.available_buffers(), 1);
670
671        let buffer = pool.get();
672        assert_eq!(pool.pool_size(), 1);
673        assert_eq!(pool.available_buffers(), 0);
674
675        assert_eq!(buffer.len(), 0);
676        assert!(buffer.capacity() > 0);
677    }
678}