1use super::{IoBuf, IoBufMut};
37use bytes::{Buf, BufMut, Bytes};
38use commonware_utils::NZUsize;
39use crossbeam_queue::ArrayQueue;
40use prometheus_client::{
41 encoding::EncodeLabelSet,
42 metrics::{counter::Counter, family::Family, gauge::Gauge},
43 registry::Registry,
44};
45use std::{
46 alloc::{alloc, dealloc, Layout},
47 mem::ManuallyDrop,
48 num::NonZeroUsize,
49 ptr::NonNull,
50 sync::{
51 atomic::{AtomicUsize, Ordering},
52 Arc, Weak,
53 },
54};
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum PoolError {
59 Oversized,
61 Exhausted,
63}
64
65impl std::fmt::Display for PoolError {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
69 Self::Exhausted => write!(f, "pool exhausted for required size class"),
70 }
71 }
72}
73
74impl std::error::Error for PoolError {}
75
76#[cfg(unix)]
81fn page_size() -> usize {
82 let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
84 if size <= 0 {
85 4096 } else {
87 size as usize
88 }
89}
90
91#[cfg(not(unix))]
92#[allow(clippy::missing_const_for_fn)]
93fn page_size() -> usize {
94 4096
95}
96
97const fn cache_line_size() -> usize {
104 cfg_if::cfg_if! {
105 if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
106 128
107 } else {
108 64
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct BufferPoolConfig {
116 pub min_size: NonZeroUsize,
118 pub max_size: NonZeroUsize,
120 pub max_per_class: NonZeroUsize,
122 pub prefill: bool,
124 pub alignment: NonZeroUsize,
127}
128
129impl Default for BufferPoolConfig {
130 fn default() -> Self {
131 Self::for_network()
132 }
133}
134
135impl BufferPoolConfig {
136 pub const fn for_network() -> Self {
144 let cache_line = NZUsize!(cache_line_size());
145 Self {
146 min_size: cache_line,
147 max_size: NZUsize!(64 * 1024),
148 max_per_class: NZUsize!(4096),
149 prefill: false,
150 alignment: cache_line,
151 }
152 }
153
154 pub fn for_storage() -> Self {
159 let page = NZUsize!(page_size());
160 Self {
161 min_size: page,
162 max_size: NZUsize!(64 * 1024),
163 max_per_class: NZUsize!(32),
164 prefill: false,
165 alignment: page,
166 }
167 }
168
169 fn validate(&self) {
179 assert!(
180 self.alignment.is_power_of_two(),
181 "alignment must be a power of two"
182 );
183 assert!(
184 self.min_size.is_power_of_two(),
185 "min_size must be a power of two"
186 );
187 assert!(
188 self.max_size.is_power_of_two(),
189 "max_size must be a power of two"
190 );
191 assert!(
192 self.min_size >= self.alignment,
193 "min_size ({}) must be >= alignment ({})",
194 self.min_size,
195 self.alignment
196 );
197 assert!(
198 self.max_size >= self.min_size,
199 "max_size must be >= min_size"
200 );
201 }
202
203 fn num_classes(&self) -> usize {
205 if self.max_size < self.min_size {
206 return 0;
207 }
208 (self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
210 }
211
212 fn class_index(&self, size: usize) -> Option<usize> {
215 if size > self.max_size.get() {
216 return None;
217 }
218 if size <= self.min_size.get() {
219 return Some(0);
220 }
221 let size_class = size.next_power_of_two();
223 let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
224 if index < self.num_classes() {
225 Some(index)
226 } else {
227 None
228 }
229 }
230
231 const fn class_size(&self, index: usize) -> usize {
233 self.min_size.get() << index
234 }
235}
236
237#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
239struct SizeClassLabel {
240 size_class: u64,
241}
242
243struct PoolMetrics {
245 allocated: Family<SizeClassLabel, Gauge>,
247 available: Family<SizeClassLabel, Gauge>,
249 allocations_total: Family<SizeClassLabel, Counter>,
251 exhausted_total: Family<SizeClassLabel, Counter>,
253 oversized_total: Counter,
255}
256
257impl PoolMetrics {
258 fn new(registry: &mut Registry) -> Self {
259 let metrics = Self {
260 allocated: Family::default(),
261 available: Family::default(),
262 allocations_total: Family::default(),
263 exhausted_total: Family::default(),
264 oversized_total: Counter::default(),
265 };
266
267 registry.register(
268 "buffer_pool_allocated",
269 "Number of buffers currently allocated from the pool",
270 metrics.allocated.clone(),
271 );
272 registry.register(
273 "buffer_pool_available",
274 "Number of buffers available in the pool",
275 metrics.available.clone(),
276 );
277 registry.register(
278 "buffer_pool_allocations_total",
279 "Total number of successful buffer allocations",
280 metrics.allocations_total.clone(),
281 );
282 registry.register(
283 "buffer_pool_exhausted_total",
284 "Total number of failed allocations due to pool exhaustion",
285 metrics.exhausted_total.clone(),
286 );
287 registry.register(
288 "buffer_pool_oversized_total",
289 "Total number of allocation requests exceeding max buffer size",
290 metrics.oversized_total.clone(),
291 );
292
293 metrics
294 }
295}
296
297pub(crate) struct AlignedBuffer {
302 ptr: NonNull<u8>,
303 layout: Layout,
304}
305
306unsafe impl Send for AlignedBuffer {}
308unsafe impl Sync for AlignedBuffer {}
310
311impl AlignedBuffer {
312 fn new(capacity: usize, alignment: usize) -> Self {
318 let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
319
320 let ptr = unsafe { alloc(layout) };
322 let ptr = NonNull::new(ptr).expect("allocation failed");
323
324 Self { ptr, layout }
325 }
326
327 #[inline]
329 const fn capacity(&self) -> usize {
330 self.layout.size()
331 }
332
333 #[inline]
335 const fn as_ptr(&self) -> *mut u8 {
336 self.ptr.as_ptr()
337 }
338}
339
340impl Drop for AlignedBuffer {
341 fn drop(&mut self) {
342 unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
344 }
345}
346
347struct SizeClass {
353 size: usize,
355 alignment: usize,
357 freelist: ArrayQueue<Option<AlignedBuffer>>,
359 allocated: AtomicUsize,
361}
362
363impl SizeClass {
364 fn new(size: usize, alignment: usize, max_buffers: usize, prefill: bool) -> Self {
365 let freelist = ArrayQueue::new(max_buffers);
366 for _ in 0..max_buffers {
367 let entry = if prefill {
368 Some(AlignedBuffer::new(size, alignment))
369 } else {
370 None
371 };
372 let _ = freelist.push(entry);
373 }
374 Self {
375 size,
376 alignment,
377 freelist,
378 allocated: AtomicUsize::new(0),
379 }
380 }
381}
382
383pub(crate) struct BufferPoolInner {
385 config: BufferPoolConfig,
386 classes: Vec<SizeClass>,
387 metrics: PoolMetrics,
388}
389
390impl BufferPoolInner {
391 fn try_alloc(&self, class_index: usize) -> Option<AlignedBuffer> {
393 let class = &self.classes[class_index];
394 let label = SizeClassLabel {
395 size_class: class.size as u64,
396 };
397
398 match class.freelist.pop() {
399 Some(Some(buffer)) => {
400 class.allocated.fetch_add(1, Ordering::Relaxed);
402 self.metrics.allocations_total.get_or_create(&label).inc();
403 self.metrics.allocated.get_or_create(&label).inc();
404 self.metrics.available.get_or_create(&label).dec();
405 Some(buffer)
406 }
407 Some(None) => {
408 class.allocated.fetch_add(1, Ordering::Relaxed);
410 self.metrics.allocations_total.get_or_create(&label).inc();
411 self.metrics.allocated.get_or_create(&label).inc();
412 Some(AlignedBuffer::new(class.size, class.alignment))
413 }
414 None => {
415 self.metrics.exhausted_total.get_or_create(&label).inc();
417 None
418 }
419 }
420 }
421
422 fn return_buffer(&self, buffer: AlignedBuffer) {
424 if let Some(class_index) = self.config.class_index(buffer.capacity()) {
426 let class = &self.classes[class_index];
427 let label = SizeClassLabel {
428 size_class: class.size as u64,
429 };
430
431 class.allocated.fetch_sub(1, Ordering::Relaxed);
432 self.metrics.allocated.get_or_create(&label).dec();
433
434 match class.freelist.push(Some(buffer)) {
436 Ok(()) => {
437 self.metrics.available.get_or_create(&label).inc();
438 }
439 Err(_buffer) => {
440 }
442 }
443 }
444 }
446}
447
448#[derive(Clone)]
461pub struct BufferPool {
462 inner: Arc<BufferPoolInner>,
463}
464
465impl std::fmt::Debug for BufferPool {
466 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467 f.debug_struct("BufferPool")
468 .field("config", &self.inner.config)
469 .field("num_classes", &self.inner.classes.len())
470 .finish()
471 }
472}
473
474impl BufferPool {
475 pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
481 config.validate();
482
483 let metrics = PoolMetrics::new(registry);
484
485 let mut classes = Vec::with_capacity(config.num_classes());
486 for i in 0..config.num_classes() {
487 let size = config.class_size(i);
488 let class = SizeClass::new(
489 size,
490 config.alignment.get(),
491 config.max_per_class.get(),
492 config.prefill,
493 );
494 classes.push(class);
495 }
496
497 if config.prefill {
499 for class in &classes {
500 let label = SizeClassLabel {
501 size_class: class.size as u64,
502 };
503 let available = class.freelist.len() as i64;
504 metrics.available.get_or_create(&label).set(available);
505 }
506 }
507
508 Self {
509 inner: Arc::new(BufferPoolInner {
510 config,
511 classes,
512 metrics,
513 }),
514 }
515 }
516
517 pub fn alloc(&self, capacity: usize) -> IoBufMut {
537 self.try_alloc(capacity).unwrap_or_else(|_| {
538 let size = capacity.max(self.inner.config.min_size.get());
539 let buffer = AlignedBuffer::new(size, self.inner.config.alignment.get());
540 IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()))
542 })
543 }
544
545 pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
556 let class_index = match self.inner.config.class_index(capacity) {
557 Some(idx) => idx,
558 None => {
559 self.inner.metrics.oversized_total.inc();
560 return Err(PoolError::Oversized);
561 }
562 };
563
564 let buffer = self
565 .inner
566 .try_alloc(class_index)
567 .ok_or(PoolError::Exhausted)?;
568 let pooled = PooledBufMut::new(buffer, Arc::downgrade(&self.inner));
569 Ok(IoBufMut::from_pooled(pooled))
570 }
571
572 pub fn config(&self) -> &BufferPoolConfig {
574 &self.inner.config
575 }
576}
577
578pub struct PooledBufMut {
620 buffer: ManuallyDrop<AlignedBuffer>,
621 cursor: usize,
623 len: usize,
625 pool: Weak<BufferPoolInner>,
627}
628
629impl std::fmt::Debug for PooledBufMut {
630 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
631 f.debug_struct("PooledBufMut")
632 .field("cursor", &self.cursor)
633 .field("len", &self.len)
634 .field("capacity", &self.capacity())
635 .finish()
636 }
637}
638
639impl PooledBufMut {
640 const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
641 Self {
642 buffer: ManuallyDrop::new(buffer),
643 cursor: 0,
644 len: 0,
645 pool,
646 }
647 }
648
649 #[inline]
654 pub(crate) fn is_tracked(&self) -> bool {
655 self.pool.strong_count() > 0
656 }
657
658 #[inline]
662 pub const fn len(&self) -> usize {
663 self.len - self.cursor
664 }
665
666 #[inline]
668 pub const fn is_empty(&self) -> bool {
669 self.cursor == self.len
670 }
671
672 #[inline]
674 pub fn capacity(&self) -> usize {
675 self.buffer.capacity() - self.cursor
676 }
677
678 #[inline]
680 fn raw_capacity(&self) -> usize {
681 self.buffer.capacity()
682 }
683
684 #[inline]
686 pub fn as_mut_ptr(&mut self) -> *mut u8 {
687 unsafe { self.buffer.as_ptr().add(self.cursor) }
689 }
690
691 #[inline]
706 pub const unsafe fn set_len(&mut self, len: usize) {
707 self.len = self.cursor + len;
708 }
709
710 #[inline]
712 pub const fn clear(&mut self) {
713 self.len = self.cursor;
714 }
715
716 #[inline]
722 pub const fn truncate(&mut self, len: usize) {
723 if len < self.len() {
724 self.len = self.cursor + len;
725 }
726 }
727
728 pub fn freeze(self) -> IoBuf {
734 let mut me = ManuallyDrop::new(self);
737 let buffer = unsafe { ManuallyDrop::take(&mut me.buffer) };
740 let cursor = me.cursor;
741 let len = me.len;
742 let pool = std::mem::take(&mut me.pool);
743
744 Bytes::from_owner(PooledOwner::new(buffer, cursor, len, pool)).into()
745 }
746}
747
748impl AsRef<[u8]> for PooledBufMut {
749 #[inline]
750 fn as_ref(&self) -> &[u8] {
751 unsafe { std::slice::from_raw_parts(self.buffer.as_ptr().add(self.cursor), self.len()) }
753 }
754}
755
756impl AsMut<[u8]> for PooledBufMut {
757 #[inline]
758 fn as_mut(&mut self) -> &mut [u8] {
759 let len = self.len();
760 unsafe { std::slice::from_raw_parts_mut(self.buffer.as_ptr().add(self.cursor), len) }
762 }
763}
764
765impl Drop for PooledBufMut {
766 fn drop(&mut self) {
767 let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
770 if let Some(pool) = self.pool.upgrade() {
771 pool.return_buffer(buffer);
772 }
773 }
775}
776
777impl Buf for PooledBufMut {
778 #[inline]
779 fn remaining(&self) -> usize {
780 self.len - self.cursor
781 }
782
783 #[inline]
784 fn chunk(&self) -> &[u8] {
785 unsafe {
787 std::slice::from_raw_parts(
788 self.buffer.as_ptr().add(self.cursor),
789 self.len - self.cursor,
790 )
791 }
792 }
793
794 #[inline]
795 fn advance(&mut self, cnt: usize) {
796 let remaining = self.len - self.cursor;
797 assert!(cnt <= remaining, "cannot advance past end of buffer");
798 self.cursor += cnt;
799 }
800}
801
802unsafe impl BufMut for PooledBufMut {
807 #[inline]
808 fn remaining_mut(&self) -> usize {
809 self.raw_capacity() - self.len
810 }
811
812 #[inline]
813 unsafe fn advance_mut(&mut self, cnt: usize) {
814 assert!(
815 cnt <= self.remaining_mut(),
816 "cannot advance past end of buffer"
817 );
818 self.len += cnt;
819 }
820
821 #[inline]
822 fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
823 let raw_cap = self.raw_capacity();
824 let len = self.len;
825 unsafe {
827 let ptr = self.buffer.as_ptr().add(len);
828 bytes::buf::UninitSlice::from_raw_parts_mut(ptr, raw_cap - len)
829 }
830 }
831}
832
833struct PooledOwner {
835 buffer: ManuallyDrop<AlignedBuffer>,
836 cursor: usize,
838 len: usize,
840 pool: Weak<BufferPoolInner>,
841}
842
843impl PooledOwner {
844 const fn new(
845 buffer: AlignedBuffer,
846 cursor: usize,
847 len: usize,
848 pool: Weak<BufferPoolInner>,
849 ) -> Self {
850 Self {
851 buffer: ManuallyDrop::new(buffer),
852 cursor,
853 len,
854 pool,
855 }
856 }
857}
858
859impl AsRef<[u8]> for PooledOwner {
861 fn as_ref(&self) -> &[u8] {
862 unsafe {
864 std::slice::from_raw_parts(
865 self.buffer.as_ptr().add(self.cursor),
866 self.len - self.cursor,
867 )
868 }
869 }
870}
871
872impl Drop for PooledOwner {
873 fn drop(&mut self) {
874 let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
876 if let Some(pool) = self.pool.upgrade() {
877 pool.return_buffer(buffer);
878 }
879 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use crate::IoBufs;
887 use bytes::BytesMut;
888 use std::{sync::mpsc, thread};
889
890 fn test_registry() -> Registry {
891 Registry::default()
892 }
893
894 fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
896 BufferPoolConfig {
897 min_size: NZUsize!(min_size),
898 max_size: NZUsize!(max_size),
899 max_per_class: NZUsize!(max_per_class),
900 prefill: false,
901 alignment: NZUsize!(page_size()),
902 }
903 }
904
905 #[test]
906 fn test_page_size() {
907 let size = page_size();
908 assert!(size >= 4096);
909 assert!(size.is_power_of_two());
910 }
911
912 #[test]
913 fn test_aligned_buffer() {
914 let page = page_size();
915 let buf = AlignedBuffer::new(4096, page);
916 assert_eq!(buf.capacity(), 4096);
917 assert!((buf.as_ptr() as usize).is_multiple_of(page));
918
919 let cache_line = cache_line_size();
921 let buf2 = AlignedBuffer::new(4096, cache_line);
922 assert_eq!(buf2.capacity(), 4096);
923 assert!((buf2.as_ptr() as usize).is_multiple_of(cache_line));
924 }
925
926 #[test]
927 fn test_config_validation() {
928 let page = page_size();
929 let config = test_config(page, page * 4, 10);
930 config.validate();
931 }
932
933 #[test]
934 #[should_panic(expected = "min_size must be a power of two")]
935 fn test_config_invalid_min_size() {
936 let config = BufferPoolConfig {
937 min_size: NZUsize!(3000),
938 max_size: NZUsize!(8192),
939 max_per_class: NZUsize!(10),
940 prefill: false,
941 alignment: NZUsize!(page_size()),
942 };
943 config.validate();
944 }
945
946 #[test]
947 fn test_config_class_index() {
948 let page = page_size();
949 let config = test_config(page, page * 8, 10);
950
951 assert_eq!(config.num_classes(), 4);
953
954 assert_eq!(config.class_index(1), Some(0));
955 assert_eq!(config.class_index(page), Some(0));
956 assert_eq!(config.class_index(page + 1), Some(1));
957 assert_eq!(config.class_index(page * 2), Some(1));
958 assert_eq!(config.class_index(page * 8), Some(3));
959 assert_eq!(config.class_index(page * 8 + 1), None);
960 }
961
962 #[test]
963 fn test_pool_alloc_and_return() {
964 let page = page_size();
965 let mut registry = test_registry();
966 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
967
968 let buf = pool.try_alloc(100).unwrap();
970 assert!(buf.capacity() >= page);
971 assert_eq!(buf.len(), 0);
972
973 drop(buf);
975
976 let buf2 = pool.try_alloc(100).unwrap();
978 assert!(buf2.capacity() >= page);
979 assert_eq!(buf2.len(), 0);
980 }
981
982 #[test]
983 fn test_pool_exhaustion() {
984 let page = page_size();
985 let mut registry = test_registry();
986 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
987
988 let _buf1 = pool.try_alloc(100).expect("first alloc should succeed");
990 let _buf2 = pool.try_alloc(100).expect("second alloc should succeed");
991
992 assert!(pool.try_alloc(100).is_err());
994 }
995
996 #[test]
997 fn test_pool_oversized() {
998 let page = page_size();
999 let mut registry = test_registry();
1000 let pool = BufferPool::new(test_config(page, page * 2, 10), &mut registry);
1001
1002 assert!(pool.try_alloc(page * 4).is_err());
1004 }
1005
1006 #[test]
1007 fn test_pool_size_classes() {
1008 let page = page_size();
1009 let mut registry = test_registry();
1010 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1011
1012 let buf1 = pool.try_alloc(100).unwrap();
1014 assert_eq!(buf1.capacity(), page);
1015
1016 let buf2 = pool.try_alloc(page + 1).unwrap();
1018 assert_eq!(buf2.capacity(), page * 2);
1019
1020 let buf3 = pool.try_alloc(page * 3).unwrap();
1021 assert_eq!(buf3.capacity(), page * 4);
1022 }
1023
1024 #[test]
1025 fn test_pooled_buf_mut_freeze() {
1026 let page = page_size();
1027 let mut registry = test_registry();
1028 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1029
1030 let mut buf = pool.try_alloc(11).unwrap();
1032 buf.put_slice(&[0u8; 11]);
1033 assert_eq!(buf.len(), 11);
1034
1035 buf.as_mut()[..5].copy_from_slice(&[1, 2, 3, 4, 5]);
1037
1038 let iobuf = buf.freeze();
1040 assert_eq!(iobuf.len(), 11);
1041 assert_eq!(&iobuf.as_ref()[..5], &[1, 2, 3, 4, 5]);
1042
1043 let slice = iobuf.slice(0..5);
1045 assert_eq!(slice.len(), 5);
1046 }
1047
1048 #[test]
1049 fn test_prefill() {
1050 let page = NZUsize!(page_size());
1051 let mut registry = test_registry();
1052 let pool = BufferPool::new(
1053 BufferPoolConfig {
1054 min_size: page,
1055 max_size: page,
1056 max_per_class: NZUsize!(5),
1057 prefill: true,
1058 alignment: page,
1059 },
1060 &mut registry,
1061 );
1062
1063 let mut bufs = Vec::new();
1065 for _ in 0..5 {
1066 bufs.push(pool.try_alloc(100).expect("alloc should succeed"));
1067 }
1068
1069 assert!(pool.try_alloc(100).is_err());
1071 }
1072
1073 #[test]
1074 fn test_config_for_network() {
1075 let config = BufferPoolConfig::for_network();
1076 config.validate();
1077 assert_eq!(config.min_size.get(), cache_line_size());
1078 assert_eq!(config.max_size.get(), 64 * 1024);
1079 assert_eq!(config.max_per_class.get(), 4096);
1080 assert!(!config.prefill);
1081 assert_eq!(config.alignment.get(), cache_line_size());
1082 }
1083
1084 #[test]
1085 fn test_config_for_storage() {
1086 let config = BufferPoolConfig::for_storage();
1087 config.validate();
1088 assert_eq!(config.min_size.get(), page_size());
1089 assert_eq!(config.max_size.get(), 64 * 1024);
1090 assert_eq!(config.max_per_class.get(), 32);
1091 assert!(!config.prefill);
1092 assert_eq!(config.alignment.get(), page_size());
1093 }
1094
1095 fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1097 let class_index = pool.inner.config.class_index(size).unwrap();
1098 pool.inner.classes[class_index]
1099 .allocated
1100 .load(Ordering::Relaxed)
1101 }
1102
1103 fn get_available(pool: &BufferPool, size: usize) -> i64 {
1105 let class_index = pool.inner.config.class_index(size).unwrap();
1106 let label = SizeClassLabel {
1107 size_class: pool.inner.classes[class_index].size as u64,
1108 };
1109 pool.inner.metrics.available.get_or_create(&label).get()
1110 }
1111
1112 #[test]
1113 fn test_freeze_returns_buffer_to_pool() {
1114 let page = page_size();
1115 let mut registry = test_registry();
1116 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1117
1118 assert_eq!(get_allocated(&pool, page), 0);
1120 assert_eq!(get_available(&pool, page), 0);
1121
1122 let buf = pool.try_alloc(100).unwrap();
1124 assert_eq!(get_allocated(&pool, page), 1);
1125 assert_eq!(get_available(&pool, page), 0);
1126
1127 let iobuf = buf.freeze();
1128 assert_eq!(get_allocated(&pool, page), 1);
1130
1131 drop(iobuf);
1133 assert_eq!(get_allocated(&pool, page), 0);
1134 assert_eq!(get_available(&pool, page), 1);
1135 }
1136
1137 #[test]
1138 fn test_cloned_iobuf_returns_buffer_when_all_dropped() {
1139 let page = page_size();
1140 let mut registry = test_registry();
1141 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1142
1143 let buf = pool.try_alloc(100).unwrap();
1144 let iobuf = buf.freeze();
1145
1146 let clone1 = iobuf.clone();
1148 let clone2 = iobuf.clone();
1149 let clone3 = iobuf.clone();
1150
1151 assert_eq!(get_allocated(&pool, page), 1);
1152
1153 drop(iobuf);
1155 drop(clone1);
1156 assert_eq!(get_allocated(&pool, page), 1);
1157 assert_eq!(get_available(&pool, page), 0);
1158
1159 drop(clone2);
1161 assert_eq!(get_allocated(&pool, page), 1); drop(clone3);
1164 assert_eq!(get_allocated(&pool, page), 0);
1165 assert_eq!(get_available(&pool, page), 1);
1166 }
1167
1168 #[test]
1169 fn test_slice_holds_buffer_reference() {
1170 let page = page_size();
1171 let mut registry = test_registry();
1172 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1173
1174 let mut buf = pool.try_alloc(100).unwrap();
1175 buf.put_slice(&[0u8; 100]);
1176 let iobuf = buf.freeze();
1177
1178 let slice = iobuf.slice(10..50);
1180
1181 drop(iobuf);
1183 assert_eq!(get_allocated(&pool, page), 1);
1184 assert_eq!(get_available(&pool, page), 0);
1185
1186 drop(slice);
1188 assert_eq!(get_allocated(&pool, page), 0);
1189 assert_eq!(get_available(&pool, page), 1);
1190 }
1191
1192 #[test]
1193 fn test_copy_to_bytes_on_pooled_buffer() {
1194 let page = page_size();
1195 let mut registry = test_registry();
1196 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1197
1198 let mut buf = pool.try_alloc(100).unwrap();
1199 buf.put_slice(&[0x42u8; 100]);
1200 let mut iobuf = buf.freeze();
1201
1202 let extracted = iobuf.copy_to_bytes(50);
1204 assert_eq!(extracted.len(), 50);
1205 assert!(extracted.iter().all(|&b| b == 0x42));
1206
1207 assert_eq!(get_allocated(&pool, page), 1);
1209
1210 drop(iobuf);
1212 assert_eq!(get_allocated(&pool, page), 1); drop(extracted);
1216 assert_eq!(get_allocated(&pool, page), 0);
1217 assert_eq!(get_available(&pool, page), 1);
1218 }
1219
1220 #[test]
1221 fn test_concurrent_clones_and_drops() {
1222 let page = page_size();
1223 let mut registry = test_registry();
1224 let pool = BufferPool::new(test_config(page, page, 4), &mut registry);
1225
1226 for _ in 0..100 {
1228 let buf = pool.try_alloc(100).unwrap();
1229 let iobuf = buf.freeze();
1230
1231 let clones: Vec<_> = (0..10).map(|_| iobuf.clone()).collect();
1233 drop(iobuf);
1234
1235 for clone in clones {
1237 drop(clone);
1238 }
1239 }
1240
1241 assert_eq!(get_allocated(&pool, page), 0);
1243 }
1244
1245 #[test]
1246 fn test_iobuf_to_iobufmut_conversion_returns_pooled_buffer() {
1247 let page = page_size();
1250 let mut registry = test_registry();
1251 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1252
1253 let buf = pool.try_alloc(100).unwrap();
1254 assert_eq!(get_allocated(&pool, page), 1);
1255
1256 let iobuf = buf.freeze();
1257 assert_eq!(get_allocated(&pool, page), 1);
1258
1259 let iobufmut: IoBufMut = iobuf.into();
1262
1263 assert_eq!(
1266 get_allocated(&pool, page),
1267 0,
1268 "pooled buffer should be returned after IoBuf->IoBufMut conversion"
1269 );
1270 assert_eq!(get_available(&pool, page), 1);
1271
1272 drop(iobufmut);
1274 assert_eq!(get_allocated(&pool, page), 0);
1276 assert_eq!(get_available(&pool, page), 1);
1277 }
1278
1279 #[test]
1280 fn test_stream_send_pattern() {
1281 let page = page_size();
1288 let mut registry = test_registry();
1289 let pool = BufferPool::new(test_config(page, page, 4), &mut registry);
1290
1291 for _ in 0..100 {
1292 let mut incoming = pool.try_alloc(100).unwrap();
1294 incoming.put_slice(&[0x42u8; 100]);
1295 let incoming_iobuf = incoming.freeze();
1296
1297 let mut bufs: IoBufs = incoming_iobuf.into();
1299 let plaintext_len = bufs.remaining();
1300
1301 let ciphertext_len = plaintext_len + 16; let mut encryption_buf = pool.try_alloc(ciphertext_len).unwrap();
1304 unsafe { encryption_buf.set_len(ciphertext_len) };
1306
1307 let mut offset = 0;
1309 while bufs.has_remaining() {
1310 let chunk = bufs.chunk();
1311 let chunk_len = chunk.len();
1312 encryption_buf.as_mut()[offset..offset + chunk_len].copy_from_slice(chunk);
1313 offset += chunk_len;
1314 bufs.advance(chunk_len);
1315 }
1316
1317 drop(bufs);
1320
1321 encryption_buf.as_mut()[plaintext_len..].fill(0xAA);
1323
1324 let ciphertext = encryption_buf.freeze();
1326
1327 drop(ciphertext);
1329 }
1330
1331 assert_eq!(get_allocated(&pool, page), 0);
1333 }
1334
1335 #[test]
1336 fn test_multithreaded_alloc_freeze_return() {
1337 let page = page_size();
1338 let mut registry = test_registry();
1339 let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
1340
1341 let mut handles = vec![];
1342
1343 cfg_if::cfg_if! {
1345 if #[cfg(miri)] {
1346 let iterations = 100;
1347 } else {
1348 let iterations = 1000;
1349 }
1350 }
1351
1352 for _ in 0..10 {
1354 let pool = pool.clone();
1355 let handle = thread::spawn(move || {
1356 for _ in 0..iterations {
1357 let buf = pool.try_alloc(100).unwrap();
1358 let iobuf = buf.freeze();
1359
1360 let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
1362 drop(iobuf);
1363
1364 for clone in clones {
1366 drop(clone);
1367 }
1368 }
1369 });
1370 handles.push(handle);
1371 }
1372
1373 for handle in handles {
1375 handle.join().unwrap();
1376 }
1377
1378 let class_index = pool.inner.config.class_index(page).unwrap();
1380 let allocated = pool.inner.classes[class_index]
1381 .allocated
1382 .load(Ordering::Relaxed);
1383 assert_eq!(
1384 allocated, 0,
1385 "all buffers should be returned after multithreaded test"
1386 );
1387 }
1388
1389 #[test]
1390 fn test_cross_thread_buffer_return() {
1391 let page = page_size();
1393 let mut registry = test_registry();
1394 let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
1395
1396 let (tx, rx) = mpsc::channel();
1397
1398 for _ in 0..50 {
1400 let buf = pool.try_alloc(100).unwrap();
1401 let iobuf = buf.freeze();
1402 tx.send(iobuf).unwrap();
1403 }
1404 drop(tx);
1405
1406 let handle = thread::spawn(move || {
1408 while let Ok(iobuf) = rx.recv() {
1409 drop(iobuf);
1410 }
1411 });
1412
1413 handle.join().unwrap();
1414
1415 assert_eq!(get_allocated(&pool, page), 0);
1417 }
1418
1419 #[test]
1420 fn test_pool_dropped_before_buffer() {
1421 let page = page_size();
1425 let mut registry = test_registry();
1426 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1427
1428 let mut buf = pool.try_alloc(100).unwrap();
1429 buf.put_slice(&[0u8; 100]);
1430 let iobuf = buf.freeze();
1431
1432 drop(pool);
1434
1435 assert_eq!(iobuf.len(), 100);
1437
1438 drop(iobuf);
1440 }
1442
1443 #[test]
1445 fn test_bytesmut_parity_buf_trait() {
1446 let page = page_size();
1447 let mut registry = test_registry();
1448 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1449
1450 let mut bytes = BytesMut::with_capacity(100);
1451 bytes.put_slice(&[0xAAu8; 50]);
1452
1453 let mut pooled = pool.try_alloc(100).unwrap();
1454 pooled.put_slice(&[0xAAu8; 50]);
1455
1456 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
1458
1459 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
1461
1462 Buf::advance(&mut bytes, 10);
1464 Buf::advance(&mut pooled, 10);
1465 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
1466 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
1467
1468 let remaining = Buf::remaining(&bytes);
1470 Buf::advance(&mut bytes, remaining);
1471 Buf::advance(&mut pooled, remaining);
1472 assert_eq!(Buf::remaining(&bytes), 0);
1473 assert_eq!(Buf::remaining(&pooled), 0);
1474 assert!(!Buf::has_remaining(&bytes));
1475 assert!(!Buf::has_remaining(&pooled));
1476 }
1477
1478 #[test]
1480 fn test_bytesmut_parity_bufmut_trait() {
1481 let page = page_size();
1482 let mut registry = test_registry();
1483 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1484
1485 let mut bytes = BytesMut::with_capacity(100);
1486 let mut pooled = pool.try_alloc(100).unwrap();
1487
1488 assert!(BufMut::remaining_mut(&bytes) >= 100);
1490 assert!(BufMut::remaining_mut(&pooled) >= 100);
1491
1492 BufMut::put_slice(&mut bytes, b"hello");
1494 BufMut::put_slice(&mut pooled, b"hello");
1495 assert_eq!(bytes.as_ref(), pooled.as_ref());
1496
1497 BufMut::put_u8(&mut bytes, 0x42);
1499 BufMut::put_u8(&mut pooled, 0x42);
1500 assert_eq!(bytes.as_ref(), pooled.as_ref());
1501
1502 let bytes_chunk = BufMut::chunk_mut(&mut bytes);
1504 let pooled_chunk = BufMut::chunk_mut(&mut pooled);
1505 assert!(bytes_chunk.len() > 0);
1506 assert!(pooled_chunk.len() > 0);
1507 }
1508
1509 #[test]
1511 fn test_bytesmut_parity_truncate_after_advance() {
1512 let page = page_size();
1513 let mut registry = test_registry();
1514 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1515
1516 let mut bytes = BytesMut::with_capacity(100);
1517 bytes.put_slice(&[0xAAu8; 50]);
1518 Buf::advance(&mut bytes, 10);
1519
1520 let mut pooled = pool.try_alloc(100).unwrap();
1521 pooled.put_slice(&[0xAAu8; 50]);
1522 Buf::advance(&mut pooled, 10);
1523
1524 assert_eq!(bytes.len(), 40);
1526 assert_eq!(pooled.len(), 40);
1527
1528 bytes.truncate(20);
1530 pooled.truncate(20);
1531
1532 assert_eq!(bytes.len(), pooled.len(), "len after truncate");
1533 assert_eq!(bytes.as_ref(), pooled.as_ref(), "content after truncate");
1534 }
1535
1536 #[test]
1538 fn test_bytesmut_parity_clear_after_advance() {
1539 let page = page_size();
1540 let mut registry = test_registry();
1541 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1542
1543 let mut bytes = BytesMut::with_capacity(100);
1544 bytes.put_slice(&[0xAAu8; 50]);
1545 Buf::advance(&mut bytes, 10);
1546
1547 let mut pooled = pool.try_alloc(100).unwrap();
1548 pooled.put_slice(&[0xAAu8; 50]);
1549 Buf::advance(&mut pooled, 10);
1550
1551 bytes.clear();
1552 pooled.clear();
1553
1554 assert_eq!(bytes.len(), 0);
1555 assert_eq!(pooled.len(), 0);
1556 assert!(bytes.is_empty());
1557 assert!(pooled.is_empty());
1558 }
1559
1560 #[test]
1562 fn test_pool_exhaustion_and_recovery() {
1563 let page = page_size();
1564 let mut registry = test_registry();
1565 let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
1566
1567 let buf1 = pool.try_alloc(100).expect("first alloc");
1569 let buf2 = pool.try_alloc(100).expect("second alloc");
1570 let buf3 = pool.try_alloc(100).expect("third alloc");
1571 assert!(pool.try_alloc(100).is_err(), "pool should be exhausted");
1572
1573 drop(buf1);
1575
1576 let buf4 = pool.try_alloc(100).expect("alloc after return");
1578 assert!(pool.try_alloc(100).is_err(), "pool exhausted again");
1579
1580 drop(buf2);
1582 drop(buf3);
1583 drop(buf4);
1584
1585 assert_eq!(get_allocated(&pool, page), 0);
1586 assert_eq!(get_available(&pool, page), 3);
1587
1588 let _buf5 = pool.try_alloc(100).expect("reuse from freelist");
1590 assert_eq!(get_available(&pool, page), 2);
1591 }
1592
1593 #[test]
1595 fn test_try_alloc_errors() {
1596 let page = page_size();
1597 let mut registry = test_registry();
1598 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1599
1600 let result = pool.try_alloc(page * 10);
1602 assert_eq!(result.unwrap_err(), PoolError::Oversized);
1603
1604 let _buf1 = pool.try_alloc(100).unwrap();
1606 let _buf2 = pool.try_alloc(100).unwrap();
1607 let result = pool.try_alloc(100);
1608 assert_eq!(result.unwrap_err(), PoolError::Exhausted);
1609 }
1610
1611 #[test]
1613 fn test_fallback_allocation() {
1614 let page = page_size();
1615 let mut registry = test_registry();
1616 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1617
1618 let buf1 = pool.try_alloc(100).unwrap();
1620 let buf2 = pool.try_alloc(100).unwrap();
1621 assert!(buf1.is_pooled());
1622 assert!(buf2.is_pooled());
1623
1624 let mut fallback_exhausted = pool.alloc(100);
1626 assert!(!fallback_exhausted.is_pooled());
1627 assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
1628
1629 let mut fallback_oversized = pool.alloc(page * 10);
1631 assert!(!fallback_oversized.is_pooled());
1632 assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
1633
1634 assert_eq!(get_allocated(&pool, page), 2);
1636
1637 drop(fallback_exhausted);
1639 drop(fallback_oversized);
1640 assert_eq!(get_allocated(&pool, page), 2);
1641
1642 drop(buf1);
1644 drop(buf2);
1645 assert_eq!(get_allocated(&pool, page), 0);
1646 }
1647
1648 #[test]
1650 fn test_is_pooled() {
1651 let page = page_size();
1652 let mut registry = test_registry();
1653 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1654
1655 let pooled = pool.try_alloc(100).unwrap();
1656 assert!(pooled.is_pooled());
1657
1658 let owned = IoBufMut::with_capacity(100);
1659 assert!(!owned.is_pooled());
1660 }
1661
1662 #[test]
1663 fn test_bytesmut_parity_capacity_after_advance() {
1664 let page = page_size();
1665 let mut registry = test_registry();
1666 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1667
1668 let mut bytes = BytesMut::with_capacity(page);
1669 bytes.put_slice(&[0xAAu8; 50]);
1670
1671 let mut pooled = pool.try_alloc(page).unwrap();
1672 pooled.put_slice(&[0xAAu8; 50]);
1673
1674 assert_eq!(bytes.len(), pooled.len(), "len before advance");
1676
1677 Buf::advance(&mut bytes, 20);
1678 Buf::advance(&mut pooled, 20);
1679
1680 assert_eq!(bytes.len(), pooled.len(), "len after advance");
1682 assert_eq!(
1683 bytes.capacity(),
1684 pooled.capacity(),
1685 "capacity after advance"
1686 );
1687 }
1688
1689 #[test]
1690 fn test_bytesmut_parity_set_len_after_advance() {
1691 let page = page_size();
1692 let mut registry = test_registry();
1693 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1694
1695 let mut bytes = BytesMut::with_capacity(page);
1696 bytes.resize(50, 0xBB);
1697 Buf::advance(&mut bytes, 20);
1698
1699 let mut pooled = pool.try_alloc(page).unwrap();
1700 pooled.put_slice(&[0xBB; 50]);
1701 Buf::advance(&mut pooled, 20);
1702
1703 unsafe {
1707 bytes.set_len(25);
1708 pooled.set_len(25);
1709 }
1710
1711 assert_eq!(bytes.len(), pooled.len(), "len after set_len");
1712 assert_eq!(bytes.as_ref(), pooled.as_ref(), "content after set_len");
1713 }
1714
1715 #[test]
1716 fn test_bytesmut_parity_clear_preserves_view() {
1717 let page = page_size();
1718 let mut registry = test_registry();
1719 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1720
1721 let mut bytes = BytesMut::with_capacity(page);
1722 bytes.resize(50, 0xCC);
1723 Buf::advance(&mut bytes, 20);
1724 let cap_before_clear = bytes.capacity();
1725 bytes.clear();
1726
1727 let mut pooled = pool.try_alloc(page).unwrap();
1728 pooled.put_slice(&[0xCC; 50]);
1729 Buf::advance(&mut pooled, 20);
1730 let pooled_cap_before = pooled.capacity();
1731 pooled.clear();
1732
1733 assert_eq!(bytes.len(), pooled.len(), "len after clear");
1735 assert_eq!(bytes.capacity(), cap_before_clear, "bytes cap unchanged");
1736 assert_eq!(pooled.capacity(), pooled_cap_before, "pooled cap unchanged");
1737 }
1738
1739 #[test]
1740 fn test_bytesmut_parity_put_after_advance() {
1741 let page = page_size();
1742 let mut registry = test_registry();
1743 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1744
1745 let mut bytes = BytesMut::with_capacity(100);
1746 bytes.resize(30, 0xAA);
1747 Buf::advance(&mut bytes, 10);
1748 bytes.put_slice(&[0xBB; 10]);
1749
1750 let mut pooled = pool.try_alloc(100).unwrap();
1751 pooled.put_slice(&[0xAA; 30]);
1752 Buf::advance(&mut pooled, 10);
1753 pooled.put_slice(&[0xBB; 10]);
1754
1755 assert_eq!(bytes.as_ref(), pooled.as_ref(), "content after put_slice");
1756 }
1757
1758 #[test]
1759 fn test_buffer_alignment() {
1760 let page = page_size();
1761 let cache_line = cache_line_size();
1762 let mut registry = test_registry();
1763
1764 cfg_if::cfg_if! {
1766 if #[cfg(miri)] {
1767 let storage_config = BufferPoolConfig {
1768 max_per_class: NZUsize!(32),
1769 ..BufferPoolConfig::for_storage()
1770 };
1771 let network_config = BufferPoolConfig {
1772 max_per_class: NZUsize!(32),
1773 ..BufferPoolConfig::for_network()
1774 };
1775 } else {
1776 let storage_config = BufferPoolConfig::for_storage();
1777 let network_config = BufferPoolConfig::for_network();
1778 }
1779 }
1780
1781 let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
1783 let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
1784 assert_eq!(
1785 buf.as_mut_ptr() as usize % page,
1786 0,
1787 "storage buffer not page-aligned"
1788 );
1789
1790 let network_buffer_pool = BufferPool::new(network_config, &mut registry);
1792 let mut buf = network_buffer_pool.try_alloc(100).unwrap();
1793 assert_eq!(
1794 buf.as_mut_ptr() as usize % cache_line,
1795 0,
1796 "network buffer not cache-line aligned"
1797 );
1798 }
1799
1800 #[test]
1801 fn test_freeze_after_advance_to_end() {
1802 let page = page_size();
1803 let mut registry = test_registry();
1804 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1805
1806 let mut buf = pool.try_alloc(100).unwrap();
1807 buf.put_slice(&[0x42; 100]);
1808 Buf::advance(&mut buf, 100);
1809
1810 let frozen = buf.freeze();
1811 assert!(frozen.is_empty());
1812 }
1813
1814 #[test]
1815 fn test_zero_capacity_allocation() {
1816 let page = page_size();
1817 let mut registry = test_registry();
1818 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1819
1820 let buf = pool.try_alloc(0).expect("zero capacity should succeed");
1821 assert_eq!(buf.capacity(), page);
1822 assert_eq!(buf.len(), 0);
1823 }
1824
1825 #[test]
1826 fn test_exact_max_size_allocation() {
1827 let page = page_size();
1828 let mut registry = test_registry();
1829 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1830
1831 let buf = pool.try_alloc(page).expect("exact max size should succeed");
1832 assert_eq!(buf.capacity(), page);
1833 }
1834
1835 #[test]
1836 fn test_freeze_after_partial_advance_mut() {
1837 let page = page_size();
1838 let mut registry = test_registry();
1839 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1840
1841 let mut buf = pool.try_alloc(100).unwrap();
1842 buf.put_slice(&[0xAA; 50]);
1844 Buf::advance(&mut buf, 20);
1846 let frozen = buf.freeze();
1848 assert_eq!(frozen.len(), 30);
1849 assert_eq!(frozen.as_ref(), &[0xAA; 30]);
1850 }
1851
1852 #[test]
1853 fn test_interleaved_advance_and_write() {
1854 let page = page_size();
1855 let mut registry = test_registry();
1856 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1857
1858 let mut buf = pool.try_alloc(100).unwrap();
1859 buf.put_slice(b"hello");
1860 Buf::advance(&mut buf, 2);
1861 buf.put_slice(b"world");
1862 assert_eq!(buf.as_ref(), b"lloworld");
1863 }
1864
1865 #[test]
1866 fn test_freeze_slice_clone_refcount() {
1867 let page = page_size();
1868 let mut registry = test_registry();
1869 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1870
1871 let mut buf = pool.try_alloc(100).unwrap();
1872 buf.put_slice(&[0x42; 100]);
1873 let iobuf = buf.freeze();
1874 let slice = iobuf.slice(10..50);
1875 let clone1 = slice.clone();
1876 let clone2 = iobuf.clone();
1877
1878 drop(iobuf);
1879 drop(slice);
1880 assert_eq!(get_allocated(&pool, page), 1); drop(clone1);
1883 assert_eq!(get_allocated(&pool, page), 1); drop(clone2);
1886 assert_eq!(get_allocated(&pool, page), 0); }
1888
1889 #[test]
1890 fn test_truncate_beyond_len_is_noop() {
1891 let page = page_size();
1892 let mut registry = test_registry();
1893 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1894
1895 let mut bytes = BytesMut::with_capacity(100);
1897 bytes.resize(50, 0xAA);
1898 bytes.truncate(100); assert_eq!(bytes.len(), 50);
1900
1901 let mut pooled = pool.try_alloc(100).unwrap();
1903 pooled.put_slice(&[0xAA; 50]);
1904 pooled.truncate(100); assert_eq!(pooled.len(), 50);
1906 }
1907
1908 #[test]
1909 fn test_freeze_empty_after_clear() {
1910 let page = page_size();
1911 let mut registry = test_registry();
1912 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1913
1914 let mut buf = pool.try_alloc(100).unwrap();
1915 buf.put_slice(&[0xAA; 50]);
1916 buf.clear();
1917
1918 let frozen = buf.freeze();
1919 assert!(frozen.is_empty());
1920 assert_eq!(frozen.len(), 0);
1921
1922 drop(frozen);
1924 assert_eq!(get_available(&pool, page), 1);
1925 }
1926
1927 #[test]
1928 fn test_alignment_after_advance() {
1929 let page = page_size();
1930 let mut registry = test_registry();
1931 let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
1932
1933 let mut buf = pool.try_alloc(100).unwrap();
1934 buf.put_slice(&[0; 100]);
1935
1936 assert_eq!(buf.as_mut_ptr() as usize % page, 0);
1938
1939 Buf::advance(&mut buf, 7);
1941 assert_ne!(buf.as_mut_ptr() as usize % page, 0);
1943 }
1944}