1use super::IoBufMut;
53use crate::iobuf::aligned::{AlignedBuffer, PooledBufMut};
54use commonware_utils::NZUsize;
55use crossbeam_queue::ArrayQueue;
56use prometheus_client::{
57 encoding::EncodeLabelSet,
58 metrics::{counter::Counter, family::Family, gauge::Gauge},
59 registry::Registry,
60};
61use std::{
62 cell::UnsafeCell,
63 num::NonZeroUsize,
64 sync::{
65 atomic::{AtomicUsize, Ordering},
66 Arc,
67 },
68};
69
70const MIN_THREAD_CACHE_BATCHING_CAPACITY: usize = 4;
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum PoolError {
80 Oversized,
82 Exhausted,
84}
85
86impl std::fmt::Display for PoolError {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
90 Self::Exhausted => write!(f, "pool exhausted for required size class"),
91 }
92 }
93}
94
95impl std::error::Error for PoolError {}
96
97#[cfg(unix)]
102fn page_size() -> usize {
103 let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
105 if size <= 0 {
106 4096 } else {
108 size as usize
109 }
110}
111
112#[cfg(not(unix))]
113#[allow(clippy::missing_const_for_fn)]
114fn page_size() -> usize {
115 4096
116}
117
118const fn cache_line_size() -> usize {
125 cfg_if::cfg_if! {
126 if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
127 128
128 } else {
129 64
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub(crate) enum BufferPoolThreadCacheConfig {
137 Disabled,
139 Fixed(NonZeroUsize),
141 ForParallelism(NonZeroUsize),
143}
144
145#[derive(Debug, Clone)]
147pub struct BufferPoolConfig {
148 pub pool_min_size: usize,
154 pub min_size: NonZeroUsize,
156 pub max_size: NonZeroUsize,
158 pub max_per_class: NonZeroUsize,
160 pub prefill: bool,
162 pub alignment: NonZeroUsize,
165 pub(crate) thread_cache_config: BufferPoolThreadCacheConfig,
172}
173
174impl BufferPoolConfig {
175 pub const fn for_network() -> Self {
183 let cache_line = NZUsize!(cache_line_size());
184 Self {
185 pool_min_size: 1024,
186 min_size: NZUsize!(1024),
187 max_size: NZUsize!(64 * 1024),
188 max_per_class: NZUsize!(4096),
189 prefill: false,
190 alignment: cache_line,
191 thread_cache_config: BufferPoolThreadCacheConfig::Disabled,
192 }
193 }
194
195 pub fn for_storage() -> Self {
200 let page = NZUsize!(page_size());
201 Self {
202 pool_min_size: 1024,
203 min_size: page,
204 max_size: NZUsize!(8 * 1024 * 1024),
205 max_per_class: NZUsize!(64),
206 prefill: false,
207 alignment: page,
208 thread_cache_config: BufferPoolThreadCacheConfig::Disabled,
209 }
210 }
211
212 pub const fn with_pool_min_size(mut self, pool_min_size: usize) -> Self {
214 self.pool_min_size = pool_min_size;
215 self
216 }
217
218 pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
220 self.min_size = min_size;
221 self
222 }
223
224 pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
226 self.max_size = max_size;
227 self
228 }
229
230 pub const fn with_max_per_class(mut self, max_per_class: NonZeroUsize) -> Self {
232 self.max_per_class = max_per_class;
233 self
234 }
235
236 pub const fn with_thread_cache_capacity(mut self, thread_cache_capacity: NonZeroUsize) -> Self {
238 self.thread_cache_config = BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity);
239 self
240 }
241
242 pub const fn with_thread_cache_for_parallelism(mut self, parallelism: NonZeroUsize) -> Self {
248 self.thread_cache_config = BufferPoolThreadCacheConfig::ForParallelism(parallelism);
249 self
250 }
251
252 pub const fn with_thread_cache_disabled(mut self) -> Self {
254 self.thread_cache_config = BufferPoolThreadCacheConfig::Disabled;
255 self
256 }
257
258 pub const fn with_prefill(mut self, prefill: bool) -> Self {
260 self.prefill = prefill;
261 self
262 }
263
264 pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
266 self.alignment = alignment;
267 self
268 }
269
270 pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
280 let mut class_bytes = 0usize;
281 for i in 0..self.num_classes() {
282 class_bytes = class_bytes.saturating_add(self.class_size(i));
283 }
284 if class_bytes == 0 {
285 return self;
286 }
287 self.max_per_class = NZUsize!(budget_bytes.get().div_ceil(class_bytes));
288 self
289 }
290
291 fn validate(&self) {
303 assert!(
304 self.alignment.is_power_of_two(),
305 "alignment must be a power of two"
306 );
307 assert!(
308 self.min_size.is_power_of_two(),
309 "min_size must be a power of two"
310 );
311 assert!(
312 self.max_size.is_power_of_two(),
313 "max_size must be a power of two"
314 );
315 assert!(
316 self.min_size >= self.alignment,
317 "min_size ({}) must be >= alignment ({})",
318 self.min_size,
319 self.alignment
320 );
321 assert!(
322 self.max_size >= self.min_size,
323 "max_size must be >= min_size"
324 );
325 assert!(
326 self.pool_min_size <= self.min_size.get(),
327 "pool_min_size ({}) must be <= min_size ({})",
328 self.pool_min_size,
329 self.min_size
330 );
331 if let BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity) = self.thread_cache_config
332 {
333 assert!(
334 thread_cache_capacity <= self.max_per_class,
335 "thread_cache_capacity ({}) must be <= max_per_class ({})",
336 thread_cache_capacity,
337 self.max_per_class
338 );
339 }
340 }
341
342 #[inline]
344 fn num_classes(&self) -> usize {
345 if self.max_size < self.min_size {
346 return 0;
347 }
348 (self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
350 }
351
352 #[inline]
355 fn class_index(&self, size: usize) -> Option<usize> {
356 if size > self.max_size.get() {
357 return None;
358 }
359 if size <= self.min_size.get() {
360 return Some(0);
361 }
362 let size_class = size.next_power_of_two();
364 let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
365 if index < self.num_classes() {
366 Some(index)
367 } else {
368 None
369 }
370 }
371
372 const fn class_size(&self, index: usize) -> usize {
374 self.min_size.get() << index
375 }
376
377 fn resolve_thread_cache_capacity(&self) -> usize {
383 match self.thread_cache_config {
384 BufferPoolThreadCacheConfig::Disabled => 0,
385 BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity) => {
386 thread_cache_capacity.get()
387 }
388 BufferPoolThreadCacheConfig::ForParallelism(parallelism) => {
389 let max_per_class = self.max_per_class.get();
390 let effective_threads = parallelism.get().min(max_per_class);
391 (max_per_class / (2 * effective_threads)).clamp(1, 8)
392 }
393 }
394 }
395}
396
397#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
399struct SizeClassLabel {
400 size_class: u64,
401}
402
403struct PoolMetrics {
405 created: Family<SizeClassLabel, Gauge>,
407 exhausted_total: Family<SizeClassLabel, Counter>,
409 oversized_total: Counter,
411}
412
413impl PoolMetrics {
414 fn new(registry: &mut Registry) -> Self {
415 let metrics = Self {
416 created: Family::default(),
417 exhausted_total: Family::default(),
418 oversized_total: Counter::default(),
419 };
420
421 registry.register(
422 "buffer_pool_created",
423 "Number of tracked buffers currently created for the pool",
424 metrics.created.clone(),
425 );
426 registry.register(
427 "buffer_pool_exhausted_total",
428 "Total number of failed allocations due to pool exhaustion",
429 metrics.exhausted_total.clone(),
430 );
431 registry.register(
432 "buffer_pool_oversized_total",
433 "Total number of allocation requests exceeding max buffer size",
434 metrics.oversized_total.clone(),
435 );
436
437 metrics
438 }
439}
440
441pub(super) struct SizeClass {
452 class_id: usize,
454 size: usize,
456 alignment: usize,
458 max: usize,
460 global: ArrayQueue<AlignedBuffer>,
462 created: AtomicUsize,
464 thread_cache_capacity: usize,
466}
467
468unsafe impl Send for SizeClass {}
472unsafe impl Sync for SizeClass {}
474
475impl SizeClass {
476 fn new(
481 class_id: usize,
482 size: usize,
483 alignment: usize,
484 max: usize,
485 thread_cache_capacity: usize,
486 prefill: bool,
487 ) -> Self {
488 let freelist = ArrayQueue::new(max);
489 let mut created = 0;
490 if prefill {
491 for _ in 0..max {
492 let _ = freelist.push(AlignedBuffer::new(size, alignment));
493 }
494 created = max;
495 }
496 Self {
497 class_id,
498 size,
499 alignment,
500 max,
501 global: freelist,
502 created: AtomicUsize::new(created),
503 thread_cache_capacity,
504 }
505 }
506
507 #[inline]
509 fn push_global(&self, buffer: AlignedBuffer) {
510 self.global.push(buffer).unwrap_or_else(|_| {
511 unreachable!("tracked buffer should always fit in the global pool")
512 });
513 }
514
515 fn try_reserve(&self) -> bool {
520 self.created
521 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |created| {
522 (created < self.max).then_some(created + 1)
523 })
524 .is_ok()
525 }
526}
527
528struct TlsSizeClassCacheEntry {
535 buffer: AlignedBuffer,
536 class: Arc<SizeClass>,
537}
538
539struct TlsSizeClassCache {
546 entries: Vec<TlsSizeClassCacheEntry>,
547 capacity: usize,
548}
549
550impl TlsSizeClassCache {
551 fn new(capacity: usize) -> Self {
553 Self {
554 entries: Vec::with_capacity(capacity),
555 capacity,
556 }
557 }
558
559 #[inline]
561 const fn len(&self) -> usize {
562 self.entries.len()
563 }
564
565 #[inline]
567 fn pop(&mut self) -> Option<TlsSizeClassCacheEntry> {
568 self.entries.pop()
569 }
570
571 fn push(&mut self, entry: TlsSizeClassCacheEntry) {
578 if self.entries.len() < self.capacity {
579 self.entries.push(entry);
580 return;
581 }
582
583 if self.capacity < MIN_THREAD_CACHE_BATCHING_CAPACITY {
584 entry.class.push_global(entry.buffer);
585 return;
586 }
587
588 let spill = self.entries.len().min(self.capacity / 2).max(1);
590 for _ in 0..spill {
591 let spilled = self
592 .entries
593 .pop()
594 .expect("spill count must not exceed cached entries");
595 spilled.class.push_global(spilled.buffer);
596 }
597
598 self.entries.push(entry);
599 }
600}
601
602impl Drop for TlsSizeClassCache {
603 fn drop(&mut self) {
604 for entry in self.entries.drain(..) {
605 entry.class.push_global(entry.buffer);
606 }
607 }
608}
609
610thread_local! {
622 static TLS_SIZE_CLASS_CACHES: UnsafeCell<Vec<Option<TlsSizeClassCache>>> =
623 const { UnsafeCell::new(Vec::new()) };
624}
625
626static NEXT_SIZE_CLASS_ID: AtomicUsize = AtomicUsize::new(0);
634
635pub struct BufferPoolThreadCache;
647
648impl BufferPoolThreadCache {
649 pub fn flush() {
651 TLS_SIZE_CLASS_CACHES.with(|bins| {
652 let bins = unsafe { &mut *bins.get() };
654 for cache in bins.iter_mut() {
655 let _ = cache.take();
656 }
657 });
658 }
659
660 #[inline]
663 fn pop(class: &Arc<SizeClass>) -> Option<TlsSizeClassCacheEntry> {
664 Self::with_cache(class.class_id, class.thread_cache_capacity, |cache| {
665 cache.pop()
666 })
667 }
668
669 #[inline]
672 pub(super) fn push(class: Arc<SizeClass>, buffer: AlignedBuffer) {
673 let class_id = class.class_id;
674 let thread_cache_capacity = class.thread_cache_capacity;
675 Self::with_cache(class_id, thread_cache_capacity, |cache| {
676 cache.push(TlsSizeClassCacheEntry { buffer, class });
677 });
678 }
679
680 #[inline]
687 fn refill(class: &Arc<SizeClass>, target: usize) {
688 Self::with_cache(class.class_id, class.thread_cache_capacity, |cache| {
689 while cache.len() + 1 < target {
690 let Some(buffer) = class.global.pop() else {
691 break;
692 };
693 cache.push(TlsSizeClassCacheEntry {
694 buffer,
695 class: class.clone(),
696 });
697 }
698 });
699 }
700
701 #[inline]
704 fn with_cache<R>(
705 class_id: usize,
706 capacity: usize,
707 f: impl FnOnce(&mut TlsSizeClassCache) -> R,
708 ) -> R {
709 TLS_SIZE_CLASS_CACHES.with(|bins| {
710 let bins = unsafe { &mut *bins.get() };
712 if class_id >= bins.len() {
713 bins.resize_with(class_id + 1, || None);
714 }
715 let cache = bins[class_id].get_or_insert_with(|| TlsSizeClassCache::new(capacity));
716 f(cache)
717 })
718 }
719}
720
721struct Allocation {
723 buffer: AlignedBuffer,
724 is_new: bool,
725 class: Arc<SizeClass>,
726}
727
728pub(crate) struct BufferPoolInner {
730 config: BufferPoolConfig,
731 classes: Vec<Arc<SizeClass>>,
732 metrics: PoolMetrics,
733}
734
735impl Drop for BufferPoolInner {
736 fn drop(&mut self) {
737 for class in &self.classes {
738 while let Some(buffer) = class.global.pop() {
739 class.created.fetch_sub(1, Ordering::Relaxed);
740 drop(buffer);
741 }
742 }
743 }
744}
745
746impl BufferPoolInner {
747 fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
758 let class = &self.classes[class_index];
759
760 if let Some(entry) = BufferPoolThreadCache::pop(class) {
762 return Some(Allocation {
763 buffer: entry.buffer,
764 is_new: false,
765 class: entry.class,
766 });
767 }
768
769 let target = (class.thread_cache_capacity / 2).max(1);
771 if let Some(buffer) = class.global.pop() {
772 BufferPoolThreadCache::refill(class, target);
773 return Some(Allocation {
774 buffer,
775 is_new: false,
776 class: class.clone(),
777 });
778 }
779
780 let label = SizeClassLabel {
782 size_class: class.size as u64,
783 };
784 if !class.try_reserve() {
785 self.metrics.exhausted_total.get_or_create(&label).inc();
786 return None;
787 }
788
789 self.metrics.created.get_or_create(&label).inc();
790 let buffer = if zero_on_new {
791 AlignedBuffer::new_zeroed(class.size, class.alignment)
792 } else {
793 AlignedBuffer::new(class.size, class.alignment)
794 };
795 Some(Allocation {
796 buffer,
797 is_new: true,
798 class: class.clone(),
799 })
800 }
801}
802
803#[derive(Clone)]
816pub struct BufferPool {
817 inner: Arc<BufferPoolInner>,
818}
819
820impl std::fmt::Debug for BufferPool {
821 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822 f.debug_struct("BufferPool")
823 .field("config", &self.inner.config)
824 .field("num_classes", &self.inner.classes.len())
825 .finish()
826 }
827}
828
829impl BufferPool {
830 pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
836 config.validate();
837 let metrics = PoolMetrics::new(registry);
838 let mut classes = Vec::with_capacity(config.num_classes());
839 let thread_cache_capacity = config.resolve_thread_cache_capacity();
840 for i in 0..config.num_classes() {
841 let size = config.class_size(i);
842 let class_id = NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed);
843 let class = Arc::new(SizeClass::new(
844 class_id,
845 size,
846 config.alignment.get(),
847 config.max_per_class.get(),
848 thread_cache_capacity,
849 config.prefill,
850 ));
851 classes.push(class);
852 }
853
854 if config.prefill {
856 for class in &classes {
857 let label = SizeClassLabel {
858 size_class: class.size as u64,
859 };
860 let created = class.global.len() as i64;
861 metrics.created.get_or_create(&label).set(created);
862 }
863 }
864
865 Self {
866 inner: Arc::new(BufferPoolInner {
867 config,
868 classes,
869 metrics,
870 }),
871 }
872 }
873
874 #[inline]
876 fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
877 let class_index = self.inner.config.class_index(capacity);
878 if class_index.is_none() {
879 self.inner.metrics.oversized_total.inc();
880 }
881 class_index
882 }
883
884 pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
903 if capacity < self.inner.config.pool_min_size {
904 let size = capacity.max(1);
905 return Ok(IoBufMut::with_alignment(size, self.inner.config.alignment));
906 }
907
908 let class_index = self
909 .class_index_or_record_oversized(capacity)
910 .ok_or(PoolError::Oversized)?;
911
912 let buffer = self
913 .inner
914 .try_alloc(class_index, false)
915 .map(|allocation| PooledBufMut::new(allocation.buffer, allocation.class))
916 .ok_or(PoolError::Exhausted)?;
917 Ok(IoBufMut::from_pooled(buffer))
918 }
919
920 pub fn alloc(&self, capacity: usize) -> IoBufMut {
941 self.try_alloc(capacity).unwrap_or_else(|_| {
942 let size = capacity.max(self.inner.config.min_size.get());
943 IoBufMut::with_alignment(size, self.inner.config.alignment)
944 })
945 }
946
947 pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
956 let mut buf = self.alloc(len);
957 unsafe { buf.set_len(len) };
959 buf
960 }
961
962 pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
981 if len < self.inner.config.pool_min_size {
982 let size = len.max(1);
983 let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
984 buf.truncate(len);
985 return Ok(buf);
986 }
987
988 let class_index = self
989 .class_index_or_record_oversized(len)
990 .ok_or(PoolError::Oversized)?;
991 let allocation = self
992 .inner
993 .try_alloc(class_index, true)
994 .ok_or(PoolError::Exhausted)?;
995
996 let mut buf = IoBufMut::from_pooled(PooledBufMut::new(allocation.buffer, allocation.class));
997 if allocation.is_new {
998 unsafe { buf.set_len(len) };
1000 } else {
1001 unsafe {
1004 std::ptr::write_bytes(buf.as_mut_ptr(), 0, len);
1005 buf.set_len(len);
1006 }
1007 }
1008 Ok(buf)
1009 }
1010
1011 pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
1032 self.try_alloc_zeroed(len).unwrap_or_else(|_| {
1033 let size = len.max(self.inner.config.min_size.get());
1035 let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
1036 buf.truncate(len);
1037 buf
1038 })
1039 }
1040
1041 pub fn config(&self) -> &BufferPoolConfig {
1043 &self.inner.config
1044 }
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049 use super::*;
1050 use crate::iobuf::IoBuf;
1051 use bytes::{Buf, BufMut};
1052 use std::{
1053 sync::{mpsc, Arc},
1054 thread,
1055 };
1056
1057 fn test_size_class(size: usize, alignment: usize) -> Arc<SizeClass> {
1058 Arc::new(SizeClass::new(
1059 NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
1060 size,
1061 alignment,
1062 8,
1063 4,
1064 false,
1065 ))
1066 }
1067
1068 fn test_registry() -> Registry {
1069 Registry::default()
1070 }
1071
1072 fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
1074 BufferPoolConfig {
1075 pool_min_size: 0,
1076 min_size: NZUsize!(min_size),
1077 max_size: NZUsize!(max_size),
1078 max_per_class: NZUsize!(max_per_class),
1079 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1080 prefill: false,
1081 alignment: NZUsize!(page_size()),
1082 }
1083 }
1084
1085 fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1090 let class_index = pool.inner.config.class_index(size).unwrap();
1091 let class = &pool.inner.classes[class_index];
1092 class.created.load(Ordering::Relaxed) - class.global.len() - get_local_len(class)
1093 }
1094
1095 fn get_available(pool: &BufferPool, size: usize) -> i64 {
1097 let class_index = pool.inner.config.class_index(size).unwrap();
1098 let class = &pool.inner.classes[class_index];
1099 (class.global.len() + get_local_len(class)) as i64
1100 }
1101
1102 fn get_local_len(class: &SizeClass) -> usize {
1105 TLS_SIZE_CLASS_CACHES.with(|bins| {
1106 let bins = unsafe { &*bins.get() };
1108 bins.get(class.class_id)
1109 .and_then(Option::as_ref)
1110 .map_or(0, TlsSizeClassCache::len)
1111 })
1112 }
1113
1114 #[test]
1115 fn test_page_size() {
1116 let size = page_size();
1117 assert!(size >= 4096);
1118 assert!(size.is_power_of_two());
1119 }
1120
1121 #[test]
1122 fn test_config_validation() {
1123 let page = page_size();
1124 let config = test_config(page, page * 4, 10);
1125 config.validate();
1126 }
1127
1128 #[test]
1129 #[should_panic(expected = "thread_cache_capacity (11) must be <= max_per_class (10)")]
1130 fn test_config_invalid_thread_cache_capacity() {
1131 let page = page_size();
1132 let config = test_config(page, page * 4, 10).with_thread_cache_capacity(NZUsize!(11));
1133 config.validate();
1134 }
1135
1136 #[test]
1137 #[should_panic(expected = "min_size must be a power of two")]
1138 fn test_config_invalid_min_size() {
1139 let config = BufferPoolConfig {
1140 pool_min_size: 0,
1141 min_size: NZUsize!(3000),
1142 max_size: NZUsize!(8192),
1143 max_per_class: NZUsize!(10),
1144 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1145 prefill: false,
1146 alignment: NZUsize!(page_size()),
1147 };
1148 config.validate();
1149 }
1150
1151 #[test]
1152 fn test_config_class_index() {
1153 let page = page_size();
1154 let config = test_config(page, page * 8, 10);
1155
1156 assert_eq!(config.num_classes(), 4);
1158
1159 assert_eq!(config.class_index(1), Some(0));
1160 assert_eq!(config.class_index(page), Some(0));
1161 assert_eq!(config.class_index(page + 1), Some(1));
1162 assert_eq!(config.class_index(page * 2), Some(1));
1163 assert_eq!(config.class_index(page * 8), Some(3));
1164 assert_eq!(config.class_index(page * 8 + 1), None);
1165 }
1166
1167 #[test]
1168 fn test_pool_alloc_and_return() {
1169 let page = page_size();
1170 let mut registry = test_registry();
1171 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1172
1173 let buf = pool.try_alloc(page).unwrap();
1175 assert!(buf.capacity() >= page);
1176 assert_eq!(buf.len(), 0);
1177
1178 drop(buf);
1180
1181 let buf2 = pool.try_alloc(page).unwrap();
1183 assert!(buf2.capacity() >= page);
1184 assert_eq!(buf2.len(), 0);
1185 }
1186
1187 #[test]
1188 fn test_alloc_len_sets_len() {
1189 let page = page_size();
1190 let mut registry = test_registry();
1191 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1192
1193 let mut buf = unsafe { pool.alloc_len(100) };
1195 assert_eq!(buf.len(), 100);
1196 buf.as_mut().fill(0xAB);
1197 let frozen = buf.freeze();
1198 assert_eq!(frozen.as_ref(), &[0xAB; 100]);
1199 }
1200
1201 #[test]
1202 fn test_alloc_zeroed_sets_len_and_zeros() {
1203 let page = page_size();
1204 let mut registry = test_registry();
1205 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1206
1207 let buf = pool.alloc_zeroed(100);
1208 assert_eq!(buf.len(), 100);
1209 assert!(buf.as_ref().iter().all(|&b| b == 0));
1210 }
1211
1212 #[test]
1213 fn test_try_alloc_zeroed_sets_len_and_zeros() {
1214 let page = page_size();
1215 let mut registry = test_registry();
1216 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1217
1218 let buf = pool.try_alloc_zeroed(page).unwrap();
1219 assert!(buf.is_pooled());
1220 assert_eq!(buf.len(), page);
1221 assert!(buf.as_ref().iter().all(|&b| b == 0));
1222 }
1223
1224 #[test]
1225 fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
1226 let page = page_size();
1227 let mut registry = test_registry();
1228 let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1229
1230 let _pooled = pool.try_alloc(page).unwrap();
1232
1233 let buf = pool.alloc_zeroed(100);
1234 assert!(!buf.is_pooled());
1235 assert_eq!(buf.len(), 100);
1236 assert!(buf.as_ref().iter().all(|&b| b == 0));
1237 }
1238
1239 #[test]
1240 fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
1241 let page = page_size();
1242 let mut registry = test_registry();
1243 let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1244
1245 let mut first = pool.alloc_zeroed(page);
1246 assert!(first.is_pooled());
1247 assert!(first.as_ref().iter().all(|&b| b == 0));
1248
1249 first.as_mut().fill(0xAB);
1251 drop(first);
1252
1253 let second = pool.alloc_zeroed(page);
1254 assert!(second.is_pooled());
1255 assert_eq!(second.len(), page);
1256 assert!(second.as_ref().iter().all(|&b| b == 0));
1257 }
1258
1259 #[test]
1260 fn test_requests_smaller_than_pool_min_size_bypass_pool() {
1261 let mut registry = test_registry();
1262 let pool = BufferPool::new(
1263 BufferPoolConfig {
1264 pool_min_size: 512,
1265 min_size: NZUsize!(512),
1266 max_size: NZUsize!(1024),
1267 max_per_class: NZUsize!(2),
1268 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1269 prefill: false,
1270 alignment: NZUsize!(128),
1271 },
1272 &mut registry,
1273 );
1274
1275 let buf = pool.try_alloc(200).unwrap();
1276 assert!(!buf.is_pooled());
1277 assert_eq!(buf.capacity(), 200);
1278
1279 let zeroed = pool.try_alloc_zeroed(200).unwrap();
1280 assert!(!zeroed.is_pooled());
1281 assert_eq!(zeroed.len(), 200);
1282 assert!(zeroed.as_ref().iter().all(|&b| b == 0));
1283
1284 let pooled = pool.try_alloc(512).unwrap();
1285 assert!(pooled.is_pooled());
1286 assert_eq!(pooled.capacity(), 512);
1287 }
1288
1289 #[test]
1290 fn test_pool_size_classes() {
1291 let page = page_size();
1292 let mut registry = test_registry();
1293 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1294
1295 let buf1 = pool.try_alloc(page).unwrap();
1297 assert_eq!(buf1.capacity(), page);
1298
1299 let buf2 = pool.try_alloc(page + 1).unwrap();
1301 assert_eq!(buf2.capacity(), page * 2);
1302
1303 let buf3 = pool.try_alloc(page * 3).unwrap();
1304 assert_eq!(buf3.capacity(), page * 4);
1305 }
1306
1307 #[test]
1308 fn test_prefill() {
1309 let page = NZUsize!(page_size());
1310 let mut registry = test_registry();
1311 let pool = BufferPool::new(
1312 BufferPoolConfig {
1313 pool_min_size: 0,
1314 min_size: page,
1315 max_size: page,
1316 max_per_class: NZUsize!(5),
1317 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1318 prefill: true,
1319 alignment: page,
1320 },
1321 &mut registry,
1322 );
1323
1324 let mut bufs = Vec::new();
1326 for _ in 0..5 {
1327 bufs.push(pool.try_alloc(page.get()).expect("alloc should succeed"));
1328 }
1329
1330 assert!(pool.try_alloc(page.get()).is_err());
1332 }
1333
1334 #[test]
1335 fn test_config_for_network() {
1336 let config = BufferPoolConfig::for_network();
1337 config.validate();
1338 assert_eq!(config.pool_min_size, 1024);
1339 assert_eq!(config.min_size.get(), 1024);
1340 assert_eq!(config.max_size.get(), 64 * 1024);
1341 assert_eq!(config.max_per_class.get(), 4096);
1342 assert_eq!(
1343 config.thread_cache_config,
1344 BufferPoolThreadCacheConfig::Disabled
1345 );
1346 assert!(!config.prefill);
1347 assert_eq!(config.alignment.get(), cache_line_size());
1348 }
1349
1350 #[test]
1351 fn test_config_for_storage() {
1352 let config = BufferPoolConfig::for_storage();
1353 config.validate();
1354 assert_eq!(config.pool_min_size, 1024);
1355 assert_eq!(config.min_size.get(), page_size());
1356 assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
1357 assert_eq!(config.max_per_class.get(), 64);
1358 assert_eq!(
1359 config.thread_cache_config,
1360 BufferPoolThreadCacheConfig::Disabled
1361 );
1362 assert!(!config.prefill);
1363 assert_eq!(config.alignment.get(), page_size());
1364 }
1365
1366 #[test]
1367 fn test_storage_config_supports_default_allocations() {
1368 let mut registry = test_registry();
1370 let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
1371
1372 let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
1373 assert_eq!(buf.capacity(), 8 * 1024 * 1024);
1374 }
1375
1376 #[test]
1377 fn test_config_builders() {
1378 let page = NZUsize!(page_size());
1379 let config = BufferPoolConfig::for_storage()
1380 .with_pool_min_size(1024)
1381 .with_max_per_class(NZUsize!(64))
1382 .with_thread_cache_capacity(NZUsize!(8))
1383 .with_prefill(true)
1384 .with_min_size(page)
1385 .with_max_size(NZUsize!(128 * 1024));
1386
1387 config.validate();
1388 assert_eq!(config.pool_min_size, 1024);
1389 assert_eq!(config.min_size, page);
1390 assert_eq!(config.max_size.get(), 128 * 1024);
1391 assert_eq!(config.max_per_class.get(), 64);
1392 assert_eq!(
1393 config.thread_cache_config,
1394 BufferPoolThreadCacheConfig::Fixed(NZUsize!(8))
1395 );
1396 assert!(config.prefill);
1397 assert_eq!(config.alignment.get(), page_size());
1399
1400 let aligned = BufferPoolConfig::for_network()
1402 .with_pool_min_size(256)
1403 .with_thread_cache_for_parallelism(NZUsize!(4))
1404 .with_alignment(NZUsize!(256))
1405 .with_min_size(NZUsize!(256));
1406 aligned.validate();
1407 assert_eq!(
1408 aligned.thread_cache_config,
1409 BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(4))
1410 );
1411 assert_eq!(aligned.alignment.get(), 256);
1412 assert_eq!(aligned.min_size.get(), 256);
1413 }
1414
1415 #[test]
1416 fn test_parallelism_policy_resolves_thread_cache_capacity() {
1417 let page = page_size();
1418 let mut registry = test_registry();
1419 let pool = BufferPool::new(
1420 test_config(page, page, 64).with_thread_cache_for_parallelism(NZUsize!(8)),
1421 &mut registry,
1422 );
1423 let class_index = pool.inner.config.class_index(page).unwrap();
1424 assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 4);
1425 }
1426
1427 #[test]
1428 fn test_fixed_thread_cache_capacity_overrides_runtime_parallelism() {
1429 let page = page_size();
1430 let mut registry = test_registry();
1431 let pool = BufferPool::new(
1432 test_config(page, page, 64).with_thread_cache_capacity(NZUsize!(7)),
1433 &mut registry,
1434 );
1435 let class_index = pool.inner.config.class_index(page).unwrap();
1436
1437 assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 7);
1439 }
1440
1441 #[test]
1442 fn test_disabled_thread_cache_does_not_retain_buffers_locally() {
1443 let page = page_size();
1444 let mut registry = test_registry();
1445 let pool = BufferPool::new(
1446 test_config(page, page, 2).with_thread_cache_disabled(),
1447 &mut registry,
1448 );
1449 let class_index = pool.inner.config.class_index(page).unwrap();
1450 let class = &pool.inner.classes[class_index];
1451
1452 let tracked = pool.try_alloc(page).expect("tracked allocation");
1453 drop(tracked);
1454
1455 assert_eq!(class.thread_cache_capacity, 0);
1458 assert_eq!(get_local_len(class), 0);
1459 assert_eq!(class.global.len(), 1);
1460 }
1461
1462 #[test]
1463 fn test_thread_cache_flush_moves_local_entries_to_global() {
1464 let page = page_size();
1465 let mut registry = test_registry();
1466 let pool = BufferPool::new(
1467 test_config(page, page * 2, 8).with_thread_cache_capacity(NZUsize!(4)),
1468 &mut registry,
1469 );
1470
1471 let small_index = pool.inner.config.class_index(page).unwrap();
1474 let large_index = pool.inner.config.class_index(page + 1).unwrap();
1475 let small_class = &pool.inner.classes[small_index];
1476 let large_class = &pool.inner.classes[large_index];
1477
1478 let small = pool.try_alloc(page).expect("tracked allocation");
1481 let large = pool.try_alloc(page + 1).expect("tracked allocation");
1482 drop(small);
1483 drop(large);
1484
1485 assert_eq!(get_local_len(small_class), 1);
1488 assert_eq!(get_local_len(large_class), 1);
1489 assert_eq!(small_class.global.len(), 0);
1490 assert_eq!(large_class.global.len(), 0);
1491
1492 BufferPoolThreadCache::flush();
1496
1497 assert_eq!(get_local_len(small_class), 0);
1500 assert_eq!(get_local_len(large_class), 0);
1501 assert_eq!(small_class.global.len(), 1);
1502 assert_eq!(large_class.global.len(), 1);
1503 }
1504
1505 #[test]
1506 fn test_config_with_budget_bytes() {
1507 let config = BufferPoolConfig {
1509 pool_min_size: 0,
1510 min_size: NZUsize!(4),
1511 max_size: NZUsize!(16),
1512 max_per_class: NZUsize!(1),
1513 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1514 prefill: false,
1515 alignment: NZUsize!(4),
1516 }
1517 .with_budget_bytes(NZUsize!(280));
1518 assert_eq!(config.max_per_class.get(), 10);
1519
1520 let small_budget = BufferPoolConfig {
1522 pool_min_size: 0,
1523 min_size: NZUsize!(4),
1524 max_size: NZUsize!(16),
1525 max_per_class: NZUsize!(1),
1526 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1527 prefill: false,
1528 alignment: NZUsize!(4),
1529 }
1530 .with_budget_bytes(NZUsize!(10));
1531 assert_eq!(small_budget.max_per_class.get(), 1);
1532 }
1533
1534 #[test]
1535 fn test_pool_error_display() {
1536 assert_eq!(
1537 PoolError::Oversized.to_string(),
1538 "requested capacity exceeds maximum buffer size"
1539 );
1540 assert_eq!(
1541 PoolError::Exhausted.to_string(),
1542 "pool exhausted for required size class"
1543 );
1544 }
1545
1546 #[test]
1547 fn test_config_invalid_range_edge_paths() {
1548 let invalid_order = BufferPoolConfig {
1551 pool_min_size: 0,
1552 min_size: NZUsize!(8),
1553 max_size: NZUsize!(4),
1554 max_per_class: NZUsize!(1),
1555 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1556 prefill: false,
1557 alignment: NZUsize!(4),
1558 };
1559 assert_eq!(invalid_order.num_classes(), 0);
1560 let unchanged = invalid_order.clone().with_budget_bytes(NZUsize!(128));
1561 assert_eq!(unchanged.max_per_class, invalid_order.max_per_class);
1562
1563 let non_power_two_max = BufferPoolConfig {
1565 pool_min_size: 0,
1566 min_size: NZUsize!(8),
1567 max_size: NZUsize!(12),
1568 max_per_class: NZUsize!(1),
1569 thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1570 prefill: false,
1571 alignment: NZUsize!(4),
1572 };
1573 assert_eq!(non_power_two_max.class_index(12), None);
1574 }
1575
1576 #[test]
1577 fn test_pool_debug_and_config_accessor() {
1578 let page = page_size();
1580 let mut registry = test_registry();
1581 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1582
1583 let debug = format!("{pool:?}");
1584 assert!(debug.contains("BufferPool"));
1585 assert!(debug.contains("num_classes"));
1586 assert_eq!(pool.config().min_size.get(), page);
1587 }
1588
1589 #[test]
1590 fn test_return_buffer_local_overflow_spills_to_global() {
1591 let page = page_size();
1592 let mut registry = test_registry();
1593 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1594 let class_index = pool
1595 .inner
1596 .config
1597 .class_index(page)
1598 .expect("class exists for page-sized buffer");
1599
1600 let tracked1 = pool.try_alloc(page).expect("first tracked allocation");
1601 let tracked2 = pool.try_alloc(page).expect("second tracked allocation");
1602
1603 drop(tracked1);
1605 assert_eq!(pool.inner.classes[class_index].global.len(), 0);
1606 assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
1607
1608 drop(tracked2);
1611 assert_eq!(pool.inner.classes[class_index].global.len(), 1);
1612 assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
1613 assert_eq!(get_available(&pool, page), 2);
1614 }
1615
1616 #[test]
1617 fn test_small_local_cache_overflow_preserves_locality() {
1618 let page = page_size();
1619 let mut registry = test_registry();
1620 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1621
1622 let mut tracked1 = pool.try_alloc(page).expect("first tracked allocation");
1626 let ptr1 = tracked1.as_mut_ptr();
1627 let mut tracked2 = pool.try_alloc(page).expect("second tracked allocation");
1628 let ptr2 = tracked2.as_mut_ptr();
1629
1630 drop(tracked1);
1631 drop(tracked2);
1632
1633 let mut reused_local = pool.try_alloc(page).expect("reuse from local cache");
1634 assert_eq!(reused_local.as_mut_ptr(), ptr1);
1635
1636 let mut reused_global = pool.try_alloc(page).expect("reuse from global freelist");
1637 assert_eq!(reused_global.as_mut_ptr(), ptr2);
1638 }
1639
1640 #[test]
1641 fn test_large_local_cache_batches_overflow_and_refill() {
1642 let page = page_size();
1643 let mut registry = test_registry();
1644 let threads = std::thread::available_parallelism().map_or(1, NonZeroUsize::get);
1645 let max_per_class = threads * 8;
1646 let pool = BufferPool::new(test_config(page, page, max_per_class), &mut registry);
1647 let class_index = pool
1648 .inner
1649 .config
1650 .class_index(page)
1651 .expect("class exists for page-sized buffer");
1652 let class = &pool.inner.classes[class_index];
1653
1654 assert!(class.thread_cache_capacity >= MIN_THREAD_CACHE_BATCHING_CAPACITY);
1655
1656 let mut bufs = Vec::new();
1660 for _ in 0..class.thread_cache_capacity + 1 {
1661 bufs.push(pool.try_alloc(page).expect("tracked allocation"));
1662 }
1663 for buf in bufs {
1664 drop(buf);
1665 }
1666
1667 assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 + 1);
1668 assert_eq!(class.global.len(), class.thread_cache_capacity / 2);
1669
1670 let mut reused = Vec::new();
1673 for _ in 0..class.thread_cache_capacity / 2 + 1 {
1674 reused.push(pool.try_alloc(page).expect("local reuse"));
1675 }
1676 assert_eq!(get_local_len(class), 0);
1677 assert_eq!(class.global.len(), class.thread_cache_capacity / 2);
1678
1679 let _global = pool.try_alloc(page).expect("global reuse with refill");
1680 assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 - 1);
1681 assert_eq!(class.global.len(), 0);
1682 }
1683
1684 #[test]
1685 fn test_tls_refill_stops_when_global_runs_empty() {
1686 let class = test_size_class(64, 64);
1687
1688 class.push_global(AlignedBuffer::new(class.size, class.alignment));
1690 BufferPoolThreadCache::refill(&class, MIN_THREAD_CACHE_BATCHING_CAPACITY);
1691
1692 assert_eq!(get_local_len(&class), 1);
1693 assert_eq!(class.global.len(), 0);
1694 }
1695
1696 #[test]
1697 fn test_tls_size_class_cache_push_tolerates_empty_spill() {
1698 let class = test_size_class(64, 64);
1699 let mut cache = TlsSizeClassCache {
1700 entries: Vec::new(),
1701 capacity: 0,
1702 };
1703
1704 cache.push(TlsSizeClassCacheEntry {
1706 buffer: AlignedBuffer::new(class.size, class.alignment),
1707 class,
1708 });
1709 drop(cache);
1710 }
1711
1712 #[test]
1713 #[should_panic(expected = "tracked buffer should always fit in the global pool")]
1714 fn test_push_global_panics_when_global_queue_is_inconsistently_full() {
1715 let class = Arc::new(SizeClass::new(
1716 NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
1717 64,
1718 64,
1719 1,
1720 1,
1721 false,
1722 ));
1723
1724 class.push_global(AlignedBuffer::new(64, 64));
1726 class.push_global(AlignedBuffer::new(64, 64));
1727 }
1728
1729 #[test]
1730 fn test_pooled_debug_and_empty_into_bytes_paths() {
1731 let page = page_size();
1734 let class = test_size_class(page, page);
1735
1736 let pooled_mut_debug = {
1738 let pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Arc::clone(&class));
1739 format!("{pooled_mut:?}")
1740 };
1741 assert!(pooled_mut_debug.contains("PooledBufMut"));
1742 assert!(pooled_mut_debug.contains("cursor"));
1743
1744 let empty_from_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Arc::clone(&class));
1746 assert!(empty_from_mut.into_bytes().is_empty());
1747
1748 let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), class).into_pooled();
1750 let pooled_debug = format!("{pooled:?}");
1751 assert!(pooled_debug.contains("PooledBuf"));
1752 assert!(pooled_debug.contains("capacity"));
1753 assert!(pooled.into_bytes().is_empty());
1754 }
1755
1756 #[test]
1757 fn test_freeze_returns_buffer_to_pool() {
1758 let page = page_size();
1759 let mut registry = test_registry();
1760 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1761
1762 assert_eq!(get_allocated(&pool, page), 0);
1764 assert_eq!(get_available(&pool, page), 0);
1765
1766 let buf = pool.try_alloc(page).unwrap();
1768 assert_eq!(get_allocated(&pool, page), 1);
1769 assert_eq!(get_available(&pool, page), 0);
1770
1771 let iobuf = buf.freeze();
1772 assert_eq!(get_allocated(&pool, page), 1);
1774
1775 drop(iobuf);
1777 assert_eq!(get_allocated(&pool, page), 0);
1778 assert_eq!(get_available(&pool, page), 1);
1779 }
1780
1781 #[test]
1782 fn test_refcount_and_copy_to_bytes_paths() {
1783 let page = page_size();
1784 let mut registry = test_registry();
1785 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1786
1787 {
1791 let mut buf = pool.try_alloc(page).unwrap();
1792 buf.put_slice(&[0xAA; 100]);
1793 let iobuf = buf.freeze();
1794 let clone = iobuf.clone();
1795 let slice = iobuf.slice(10..40);
1796 let empty = iobuf.slice(10..10);
1797 assert!(empty.is_empty());
1798 drop(iobuf);
1799 assert_eq!(get_allocated(&pool, page), 1);
1800 drop(slice);
1801 assert_eq!(get_allocated(&pool, page), 1);
1802 drop(clone);
1803 assert_eq!(get_allocated(&pool, page), 0);
1804 }
1805
1806 {
1812 let mut buf = pool.try_alloc(page).unwrap();
1813 buf.put_slice(&[0x42; 100]);
1814 let mut iobuf = buf.freeze();
1815
1816 let zero = iobuf.copy_to_bytes(0);
1817 assert!(zero.is_empty());
1818 assert_eq!(iobuf.remaining(), 100);
1819
1820 let partial = iobuf.copy_to_bytes(30);
1821 assert_eq!(&partial[..], &[0x42; 30]);
1822 assert_eq!(iobuf.remaining(), 70);
1823
1824 let rest = iobuf.copy_to_bytes(70);
1825 assert_eq!(&rest[..], &[0x42; 70]);
1826 assert_eq!(iobuf.remaining(), 0);
1827
1828 let empty = iobuf.copy_to_bytes(0);
1830 assert!(empty.is_empty());
1831
1832 drop(iobuf);
1833 assert_eq!(get_allocated(&pool, page), 1);
1834 drop(zero);
1835 drop(partial);
1836 assert_eq!(get_allocated(&pool, page), 1);
1837 drop(rest);
1838 assert_eq!(get_allocated(&pool, page), 0);
1839 }
1840
1841 {
1843 let buf = pool.try_alloc(page).unwrap();
1844 let mut iobufmut = buf;
1845 iobufmut.put_slice(&[0x7E; 100]);
1846
1847 let zero = iobufmut.copy_to_bytes(0);
1848 assert!(zero.is_empty());
1849 assert_eq!(iobufmut.remaining(), 100);
1850
1851 let partial = iobufmut.copy_to_bytes(30);
1852 assert_eq!(&partial[..], &[0x7E; 30]);
1853 assert_eq!(iobufmut.remaining(), 70);
1854
1855 let rest = iobufmut.copy_to_bytes(70);
1856 assert_eq!(&rest[..], &[0x7E; 70]);
1857 assert_eq!(iobufmut.remaining(), 0);
1858
1859 drop(iobufmut);
1860 assert_eq!(get_allocated(&pool, page), 1);
1861 drop(zero);
1862 drop(partial);
1863 assert_eq!(get_allocated(&pool, page), 1);
1864 drop(rest);
1865 assert_eq!(get_allocated(&pool, page), 0);
1866 }
1867 }
1868
1869 #[test]
1870 fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
1871 let page = page_size();
1873 let mut registry = test_registry();
1874 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1875
1876 let buf = pool.try_alloc(page).unwrap();
1877 assert_eq!(get_allocated(&pool, page), 1);
1878
1879 let iobuf = buf.freeze();
1880 assert_eq!(get_allocated(&pool, page), 1);
1881
1882 let iobufmut: IoBufMut = iobuf.into();
1883
1884 assert_eq!(
1886 get_allocated(&pool, page),
1887 1,
1888 "pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
1889 );
1890 assert_eq!(get_available(&pool, page), 0);
1891
1892 drop(iobufmut);
1894 assert_eq!(get_allocated(&pool, page), 0);
1895 assert_eq!(get_available(&pool, page), 1);
1896 }
1897
1898 #[test]
1899 fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
1900 let page = page_size();
1903 let mut registry = test_registry();
1904 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1905
1906 let mut buf = pool.try_alloc(page).unwrap();
1908 buf.put_slice(&vec![0xEE; page]);
1909 let iobuf = buf.freeze();
1910
1911 let iobufmut: IoBufMut = iobuf.into();
1913 assert_eq!(iobufmut.len(), page);
1914 assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
1915 assert_eq!(get_allocated(&pool, page), 1);
1916 assert_eq!(get_available(&pool, page), 0);
1917
1918 drop(iobufmut);
1920 assert_eq!(get_allocated(&pool, page), 0);
1921 assert_eq!(get_available(&pool, page), 1);
1922 }
1923
1924 #[test]
1925 fn test_iobuf_try_into_mut_recycles_full_unique_view() {
1926 let page = page_size();
1929 let mut registry = test_registry();
1930 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1931
1932 let mut buf = pool.try_alloc(page).unwrap();
1933 buf.put_slice(&vec![0xAB; page]);
1934 let iobuf = buf.freeze();
1935 assert_eq!(get_allocated(&pool, page), 1);
1936
1937 let recycled = iobuf
1939 .try_into_mut()
1940 .expect("unique full-view pooled buffer should recycle");
1941 assert_eq!(recycled.len(), page);
1942 assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
1943 assert_eq!(recycled.capacity(), page);
1944 assert_eq!(get_allocated(&pool, page), 1);
1945
1946 drop(recycled);
1947 assert_eq!(get_allocated(&pool, page), 0);
1948 assert_eq!(get_available(&pool, page), 1);
1949 }
1950
1951 #[test]
1952 fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
1953 let page = page_size();
1954 let mut registry = test_registry();
1955 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1956
1957 let mut buf = pool.try_alloc(page).unwrap();
1959 buf.put_slice(&vec![0xCD; page]);
1960 let iobuf = buf.freeze();
1961 let sliced = iobuf.slice(1..page);
1962 drop(iobuf);
1963 let recycled = sliced
1964 .try_into_mut()
1965 .expect("unique sliced pooled buffer should recycle");
1966 assert_eq!(recycled.len(), page - 1);
1967 assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
1968 assert_eq!(recycled.capacity(), page - 1);
1969 assert_eq!(get_allocated(&pool, page), 1);
1970 drop(recycled);
1971 assert_eq!(get_allocated(&pool, page), 0);
1972 assert_eq!(get_available(&pool, page), 1);
1973
1974 let mut buf = pool.try_alloc(page).unwrap();
1976 buf.put_slice(&vec![0xEF; page]);
1977 let iobuf = buf.freeze();
1978 let cloned = iobuf.clone();
1979 let iobuf = iobuf
1980 .try_into_mut()
1981 .expect_err("shared pooled buffer must not convert to mutable");
1982
1983 drop(cloned);
1984 drop(iobuf);
1985 assert_eq!(get_allocated(&pool, page), 0);
1986 assert!(get_available(&pool, page) >= 1);
1987 }
1988
1989 #[test]
1990 fn test_multithreaded_alloc_freeze_return() {
1991 let page = page_size();
1992 let mut registry = test_registry();
1993 let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
1994
1995 let mut handles = vec![];
1996
1997 cfg_if::cfg_if! {
1999 if #[cfg(miri)] {
2000 let iterations = 100;
2001 } else {
2002 let iterations = 1000;
2003 }
2004 }
2005
2006 for _ in 0..10 {
2008 let pool = pool.clone();
2009 let handle = thread::spawn(move || {
2010 for _ in 0..iterations {
2011 let buf = pool.try_alloc(page).unwrap();
2012 let iobuf = buf.freeze();
2013
2014 let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
2016 drop(iobuf);
2017
2018 for clone in clones {
2020 drop(clone);
2021 }
2022 }
2023 });
2024 handles.push(handle);
2025 }
2026
2027 for handle in handles {
2029 handle.join().unwrap();
2030 }
2031
2032 let _buf = pool
2036 .try_alloc(page)
2037 .expect("pool should remain usable after multithreaded test");
2038 }
2039
2040 #[test]
2041 fn test_cross_thread_buffer_return() {
2042 let page = page_size();
2044 let mut registry = test_registry();
2045 let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
2046
2047 let (tx, rx) = mpsc::channel();
2048
2049 for _ in 0..50 {
2051 let buf = pool.try_alloc(page).unwrap();
2052 let iobuf = buf.freeze();
2053 tx.send(iobuf).unwrap();
2054 }
2055 drop(tx);
2056
2057 let handle = thread::spawn(move || {
2061 while let Ok(iobuf) = rx.recv() {
2062 drop(iobuf);
2063 }
2064
2065 let class_index = pool
2066 .inner
2067 .config
2068 .class_index(page)
2069 .expect("class exists for page-sized buffer");
2070 assert!(
2071 get_local_len(&pool.inner.classes[class_index]) >= 1,
2072 "dropping thread should retain returned buffers in its local cache"
2073 );
2074
2075 for _ in 0..50 {
2076 let _buf = pool
2077 .try_alloc(page)
2078 .expect("dropping thread should be able to reuse returned buffers");
2079 }
2080 });
2081
2082 handle.join().unwrap();
2083 }
2084
2085 #[test]
2086 fn test_thread_exit_flushes_local_bin() {
2087 let page = page_size();
2090 let mut registry = test_registry();
2091 let pool = Arc::new(BufferPool::new(test_config(page, page, 1), &mut registry));
2092
2093 let worker_pool = pool.clone();
2095 thread::spawn(move || {
2096 let buf = worker_pool
2097 .try_alloc(page)
2098 .expect("worker should allocate tracked buffer");
2099 drop(buf);
2100 })
2101 .join()
2102 .expect("worker thread should exit cleanly");
2103
2104 let class_index = pool
2107 .inner
2108 .config
2109 .class_index(page)
2110 .expect("class exists for page-sized buffer");
2111 assert_eq!(pool.inner.classes[class_index].global.len(), 1);
2112 assert_eq!(get_local_len(&pool.inner.classes[class_index]), 0);
2113
2114 let _buf = pool
2116 .try_alloc(page)
2117 .expect("thread-exited local buffer should be reusable");
2118 }
2119
2120 #[test]
2121 fn test_pool_drop_drains_global_freelist() {
2122 let page = page_size();
2125 let mut registry = test_registry();
2126 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2127 let class_index = pool
2128 .inner
2129 .config
2130 .class_index(page)
2131 .expect("class exists for page-sized buffer");
2132 let class = Arc::clone(&pool.inner.classes[class_index]);
2133
2134 let buf1 = pool.try_alloc(page).unwrap();
2137 let buf2 = pool.try_alloc(page).unwrap();
2138 drop(buf1);
2139 drop(buf2);
2140
2141 assert_eq!(class.global.len(), 1);
2142 assert_eq!(get_local_len(&class), 1);
2143
2144 drop(pool);
2147
2148 assert_eq!(class.global.len(), 0);
2149 assert_eq!(get_local_len(&class), 1);
2150 assert_eq!(class.created.load(Ordering::Relaxed), 1);
2151 }
2152
2153 #[test]
2154 fn test_pool_dropped_before_buffer() {
2155 let page = page_size();
2159 let mut registry = test_registry();
2160 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2161
2162 let mut buf = pool.try_alloc(page).unwrap();
2163 buf.put_slice(&[0u8; 100]);
2164 let iobuf = buf.freeze();
2165
2166 drop(pool);
2168
2169 assert_eq!(iobuf.len(), 100);
2171
2172 drop(iobuf);
2174 }
2176
2177 #[test]
2178 fn test_pool_exhaustion_and_recovery() {
2179 let page = page_size();
2181 let mut registry = test_registry();
2182 let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
2183
2184 let buf1 = pool.try_alloc(page).expect("first alloc");
2186 let buf2 = pool.try_alloc(page).expect("second alloc");
2187 let buf3 = pool.try_alloc(page).expect("third alloc");
2188 assert!(pool.try_alloc(page).is_err(), "pool should be exhausted");
2189
2190 drop(buf1);
2192
2193 let buf4 = pool.try_alloc(page).expect("alloc after return");
2195 assert!(pool.try_alloc(page).is_err(), "pool exhausted again");
2196
2197 drop(buf2);
2199 drop(buf3);
2200 drop(buf4);
2201
2202 assert_eq!(get_allocated(&pool, page), 0);
2203 assert_eq!(get_available(&pool, page), 3);
2204
2205 let _buf5 = pool.try_alloc(page).expect("reuse from freelist");
2207 assert_eq!(get_available(&pool, page), 2);
2208 }
2209
2210 #[test]
2211 fn test_try_alloc_errors() {
2212 let page = page_size();
2214 let mut registry = test_registry();
2215 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2216
2217 let result = pool.try_alloc(page * 10);
2219 assert_eq!(result.unwrap_err(), PoolError::Oversized);
2220
2221 let _buf1 = pool.try_alloc(page).unwrap();
2223 let _buf2 = pool.try_alloc(page).unwrap();
2224 let result = pool.try_alloc(page);
2225 assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2226 }
2227
2228 #[test]
2229 fn test_try_alloc_zeroed_errors() {
2230 let page = page_size();
2232 let mut registry = test_registry();
2233 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2234
2235 let result = pool.try_alloc_zeroed(page * 10);
2237 assert_eq!(result.unwrap_err(), PoolError::Oversized);
2238
2239 let _buf1 = pool.try_alloc_zeroed(page).unwrap();
2241 let _buf2 = pool.try_alloc_zeroed(page).unwrap();
2242 let result = pool.try_alloc_zeroed(page);
2243 assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2244 }
2245
2246 #[test]
2247 fn test_fallback_allocation() {
2248 let page = page_size();
2250 let mut registry = test_registry();
2251 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2252
2253 let buf1 = pool.try_alloc(page).unwrap();
2255 let buf2 = pool.try_alloc(page).unwrap();
2256 assert!(buf1.is_pooled());
2257 assert!(buf2.is_pooled());
2258
2259 let mut fallback_exhausted = pool.alloc(page);
2261 assert!(!fallback_exhausted.is_pooled());
2262 assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
2263
2264 let mut fallback_oversized = pool.alloc(page * 10);
2266 assert!(!fallback_oversized.is_pooled());
2267 assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
2268
2269 assert_eq!(get_allocated(&pool, page), 2);
2271
2272 drop(fallback_exhausted);
2274 drop(fallback_oversized);
2275 assert_eq!(get_allocated(&pool, page), 2);
2276
2277 drop(buf1);
2279 drop(buf2);
2280 assert_eq!(get_allocated(&pool, page), 0);
2281 }
2282
2283 #[test]
2284 fn test_is_pooled() {
2285 let page = page_size();
2288 let mut registry = test_registry();
2289 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2290
2291 let pooled = pool.try_alloc(page).unwrap();
2292 assert!(pooled.is_pooled());
2293
2294 let owned = IoBufMut::with_capacity(100);
2295 assert!(!owned.is_pooled());
2296 }
2297
2298 #[test]
2299 fn test_iobuf_is_pooled() {
2300 let page = page_size();
2301 let mut registry = test_registry();
2302 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2303
2304 let pooled = pool.try_alloc(page).unwrap().freeze();
2305 assert!(pooled.is_pooled());
2306
2307 let fallback = pool.alloc(page * 10).freeze();
2309 assert!(!fallback.is_pooled());
2310
2311 let bytes = IoBuf::copy_from_slice(b"hello");
2312 assert!(!bytes.is_pooled());
2313 }
2314
2315 #[test]
2316 fn test_buffer_alignment() {
2317 let page = page_size();
2318 let cache_line = cache_line_size();
2319 let mut registry = test_registry();
2320
2321 cfg_if::cfg_if! {
2323 if #[cfg(miri)] {
2324 let storage_config = BufferPoolConfig {
2325 max_per_class: NZUsize!(32),
2326 ..BufferPoolConfig::for_storage()
2327 };
2328 let network_config = BufferPoolConfig {
2329 max_per_class: NZUsize!(32),
2330 ..BufferPoolConfig::for_network()
2331 };
2332 } else {
2333 let storage_config = BufferPoolConfig::for_storage();
2334 let network_config = BufferPoolConfig::for_network();
2335 }
2336 }
2337
2338 let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
2340 let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
2341 assert_eq!(
2342 buf.as_mut_ptr() as usize % page,
2343 0,
2344 "storage buffer not page-aligned"
2345 );
2346
2347 let network_buffer_pool = BufferPool::new(network_config, &mut registry);
2349 let mut buf = network_buffer_pool.try_alloc(100).unwrap();
2350 assert_eq!(
2351 buf.as_mut_ptr() as usize % cache_line,
2352 0,
2353 "network buffer not cache-line aligned"
2354 );
2355 }
2356}