1use super::{IoBuf, IoBufMut};
37use bytes::{Buf, BufMut, Bytes};
38use commonware_utils::NZUsize;
39use crossbeam_queue::ArrayQueue;
40use prometheus_client::{
41 encoding::EncodeLabelSet,
42 metrics::{counter::Counter, family::Family, gauge::Gauge},
43 registry::Registry,
44};
45use std::{
46 alloc::{alloc, alloc_zeroed, dealloc, handle_alloc_error, Layout},
47 mem::ManuallyDrop,
48 num::NonZeroUsize,
49 ops::{Bound, RangeBounds},
50 ptr::NonNull,
51 sync::{
52 atomic::{AtomicUsize, Ordering},
53 Arc, Weak,
54 },
55};
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum PoolError {
60 Oversized,
62 Exhausted,
64}
65
66impl std::fmt::Display for PoolError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
70 Self::Exhausted => write!(f, "pool exhausted for required size class"),
71 }
72 }
73}
74
75impl std::error::Error for PoolError {}
76
77#[cfg(unix)]
82fn page_size() -> usize {
83 let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
85 if size <= 0 {
86 4096 } else {
88 size as usize
89 }
90}
91
92#[cfg(not(unix))]
93#[allow(clippy::missing_const_for_fn)]
94fn page_size() -> usize {
95 4096
96}
97
98const fn cache_line_size() -> usize {
105 cfg_if::cfg_if! {
106 if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
107 128
108 } else {
109 64
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct BufferPoolConfig {
117 pub min_size: NonZeroUsize,
119 pub max_size: NonZeroUsize,
121 pub max_per_class: NonZeroUsize,
123 pub prefill: bool,
125 pub alignment: NonZeroUsize,
128}
129
130impl BufferPoolConfig {
131 pub const fn for_network() -> Self {
139 let cache_line = NZUsize!(cache_line_size());
140 Self {
141 min_size: cache_line,
142 max_size: NZUsize!(64 * 1024),
143 max_per_class: NZUsize!(4096),
144 prefill: false,
145 alignment: cache_line,
146 }
147 }
148
149 pub fn for_storage() -> Self {
154 let page = NZUsize!(page_size());
155 Self {
156 min_size: page,
157 max_size: NZUsize!(8 * 1024 * 1024),
158 max_per_class: NZUsize!(32),
159 prefill: false,
160 alignment: page,
161 }
162 }
163
164 pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
166 self.min_size = min_size;
167 self
168 }
169
170 pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
172 self.max_size = max_size;
173 self
174 }
175
176 pub const fn with_max_per_class(mut self, max_per_class: NonZeroUsize) -> Self {
178 self.max_per_class = max_per_class;
179 self
180 }
181
182 pub const fn with_prefill(mut self, prefill: bool) -> Self {
184 self.prefill = prefill;
185 self
186 }
187
188 pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
190 self.alignment = alignment;
191 self
192 }
193
194 pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
204 let mut class_bytes = 0usize;
205 for i in 0..self.num_classes() {
206 class_bytes = class_bytes.saturating_add(self.class_size(i));
207 }
208 if class_bytes == 0 {
209 return self;
210 }
211 self.max_per_class = NZUsize!(budget_bytes.get().div_ceil(class_bytes));
212 self
213 }
214
215 fn validate(&self) {
225 assert!(
226 self.alignment.is_power_of_two(),
227 "alignment must be a power of two"
228 );
229 assert!(
230 self.min_size.is_power_of_two(),
231 "min_size must be a power of two"
232 );
233 assert!(
234 self.max_size.is_power_of_two(),
235 "max_size must be a power of two"
236 );
237 assert!(
238 self.min_size >= self.alignment,
239 "min_size ({}) must be >= alignment ({})",
240 self.min_size,
241 self.alignment
242 );
243 assert!(
244 self.max_size >= self.min_size,
245 "max_size must be >= min_size"
246 );
247 }
248
249 fn num_classes(&self) -> usize {
251 if self.max_size < self.min_size {
252 return 0;
253 }
254 (self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
256 }
257
258 fn class_index(&self, size: usize) -> Option<usize> {
261 if size > self.max_size.get() {
262 return None;
263 }
264 if size <= self.min_size.get() {
265 return Some(0);
266 }
267 let size_class = size.next_power_of_two();
269 let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
270 if index < self.num_classes() {
271 Some(index)
272 } else {
273 None
274 }
275 }
276
277 const fn class_size(&self, index: usize) -> usize {
279 self.min_size.get() << index
280 }
281}
282
283#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
285struct SizeClassLabel {
286 size_class: u64,
287}
288
289struct PoolMetrics {
291 allocated: Family<SizeClassLabel, Gauge>,
293 available: Family<SizeClassLabel, Gauge>,
295 allocations_total: Family<SizeClassLabel, Counter>,
297 exhausted_total: Family<SizeClassLabel, Counter>,
299 oversized_total: Counter,
301}
302
303impl PoolMetrics {
304 fn new(registry: &mut Registry) -> Self {
305 let metrics = Self {
306 allocated: Family::default(),
307 available: Family::default(),
308 allocations_total: Family::default(),
309 exhausted_total: Family::default(),
310 oversized_total: Counter::default(),
311 };
312
313 registry.register(
314 "buffer_pool_allocated",
315 "Number of buffers currently allocated from the pool",
316 metrics.allocated.clone(),
317 );
318 registry.register(
319 "buffer_pool_available",
320 "Number of buffers available in the pool",
321 metrics.available.clone(),
322 );
323 registry.register(
324 "buffer_pool_allocations_total",
325 "Total number of successful buffer allocations",
326 metrics.allocations_total.clone(),
327 );
328 registry.register(
329 "buffer_pool_exhausted_total",
330 "Total number of failed allocations due to pool exhaustion",
331 metrics.exhausted_total.clone(),
332 );
333 registry.register(
334 "buffer_pool_oversized_total",
335 "Total number of allocation requests exceeding max buffer size",
336 metrics.oversized_total.clone(),
337 );
338
339 metrics
340 }
341}
342
343pub(crate) struct AlignedBuffer {
348 ptr: NonNull<u8>,
349 layout: Layout,
350}
351
352unsafe impl Send for AlignedBuffer {}
354unsafe impl Sync for AlignedBuffer {}
356
357impl AlignedBuffer {
358 fn new(capacity: usize, alignment: usize) -> Self {
371 assert!(capacity > 0, "capacity must be greater than zero");
372 let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
373
374 let ptr = unsafe { alloc(layout) };
376 let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout));
377
378 Self { ptr, layout }
379 }
380
381 fn new_zeroed(capacity: usize, alignment: usize) -> Self {
394 assert!(capacity > 0, "capacity must be greater than zero");
395 let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
396
397 let ptr = unsafe { alloc_zeroed(layout) };
399 let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout));
400
401 Self { ptr, layout }
402 }
403
404 #[inline]
406 const fn capacity(&self) -> usize {
407 self.layout.size()
408 }
409
410 #[inline]
412 const fn as_ptr(&self) -> *mut u8 {
413 self.ptr.as_ptr()
414 }
415}
416
417impl Drop for AlignedBuffer {
418 fn drop(&mut self) {
419 unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
421 }
422}
423
424struct SizeClass {
430 size: usize,
432 alignment: usize,
434 freelist: ArrayQueue<Option<AlignedBuffer>>,
436 allocated: AtomicUsize,
438}
439
440impl SizeClass {
441 fn new(size: usize, alignment: usize, max_buffers: usize, prefill: bool) -> Self {
442 let freelist = ArrayQueue::new(max_buffers);
443 for _ in 0..max_buffers {
444 let entry = if prefill {
445 Some(AlignedBuffer::new(size, alignment))
446 } else {
447 None
448 };
449 let _ = freelist.push(entry);
450 }
451 Self {
452 size,
453 alignment,
454 freelist,
455 allocated: AtomicUsize::new(0),
456 }
457 }
458}
459
460struct Allocation {
462 buffer: AlignedBuffer,
463 is_new: bool,
464}
465
466pub(crate) struct BufferPoolInner {
468 config: BufferPoolConfig,
469 classes: Vec<SizeClass>,
470 metrics: PoolMetrics,
471}
472
473impl BufferPoolInner {
474 fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
479 let class = &self.classes[class_index];
480 let label = SizeClassLabel {
481 size_class: class.size as u64,
482 };
483
484 match class.freelist.pop() {
485 Some(Some(buffer)) => {
486 class.allocated.fetch_add(1, Ordering::Relaxed);
488 self.metrics.allocations_total.get_or_create(&label).inc();
489 self.metrics.allocated.get_or_create(&label).inc();
490 self.metrics.available.get_or_create(&label).dec();
491 Some(Allocation {
492 buffer,
493 is_new: false,
494 })
495 }
496 Some(None) => {
497 class.allocated.fetch_add(1, Ordering::Relaxed);
499 self.metrics.allocations_total.get_or_create(&label).inc();
500 self.metrics.allocated.get_or_create(&label).inc();
501 let buffer = if zero_on_new {
502 AlignedBuffer::new_zeroed(class.size, class.alignment)
503 } else {
504 AlignedBuffer::new(class.size, class.alignment)
505 };
506 Some(Allocation {
507 buffer,
508 is_new: true,
509 })
510 }
511 None => {
512 self.metrics.exhausted_total.get_or_create(&label).inc();
514 None
515 }
516 }
517 }
518
519 fn return_buffer(&self, buffer: AlignedBuffer) {
521 if let Some(class_index) = self.config.class_index(buffer.capacity()) {
523 let class = &self.classes[class_index];
524 let label = SizeClassLabel {
525 size_class: class.size as u64,
526 };
527
528 class.allocated.fetch_sub(1, Ordering::Relaxed);
529 self.metrics.allocated.get_or_create(&label).dec();
530
531 match class.freelist.push(Some(buffer)) {
533 Ok(()) => {
534 self.metrics.available.get_or_create(&label).inc();
535 }
536 Err(_buffer) => {
537 }
539 }
540 }
541 }
543}
544
545#[derive(Clone)]
558pub struct BufferPool {
559 inner: Arc<BufferPoolInner>,
560}
561
562impl std::fmt::Debug for BufferPool {
563 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564 f.debug_struct("BufferPool")
565 .field("config", &self.inner.config)
566 .field("num_classes", &self.inner.classes.len())
567 .finish()
568 }
569}
570
571impl BufferPool {
572 pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
578 config.validate();
579
580 let metrics = PoolMetrics::new(registry);
581
582 let mut classes = Vec::with_capacity(config.num_classes());
583 for i in 0..config.num_classes() {
584 let size = config.class_size(i);
585 let class = SizeClass::new(
586 size,
587 config.alignment.get(),
588 config.max_per_class.get(),
589 config.prefill,
590 );
591 classes.push(class);
592 }
593
594 if config.prefill {
596 for class in &classes {
597 let label = SizeClassLabel {
598 size_class: class.size as u64,
599 };
600 let available = class.freelist.len() as i64;
601 metrics.available.get_or_create(&label).set(available);
602 }
603 }
604
605 Self {
606 inner: Arc::new(BufferPoolInner {
607 config,
608 classes,
609 metrics,
610 }),
611 }
612 }
613
614 fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
616 let class_index = self.inner.config.class_index(capacity);
617 if class_index.is_none() {
618 self.inner.metrics.oversized_total.inc();
619 }
620 class_index
621 }
622
623 pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
640 let class_index = self
641 .class_index_or_record_oversized(capacity)
642 .ok_or(PoolError::Oversized)?;
643
644 let buffer = self
645 .inner
646 .try_alloc(class_index, false)
647 .map(|allocation| allocation.buffer)
648 .ok_or(PoolError::Exhausted)?;
649 let pooled = PooledBufMut::new(buffer, Arc::downgrade(&self.inner));
650 Ok(IoBufMut::from_pooled(pooled))
651 }
652
653 pub fn alloc(&self, capacity: usize) -> IoBufMut {
672 self.try_alloc(capacity).unwrap_or_else(|_| {
673 let size = capacity.max(self.inner.config.min_size.get());
674 let buffer = AlignedBuffer::new(size, self.inner.config.alignment.get());
675 IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()))
677 })
678 }
679
680 pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
689 let mut buf = self.alloc(len);
690 unsafe { buf.set_len(len) };
692 buf
693 }
694
695 pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
712 let class_index = self
713 .class_index_or_record_oversized(len)
714 .ok_or(PoolError::Oversized)?;
715 let allocation = self
716 .inner
717 .try_alloc(class_index, true)
718 .ok_or(PoolError::Exhausted)?;
719
720 let mut buf = IoBufMut::from_pooled(PooledBufMut::new(
721 allocation.buffer,
722 Arc::downgrade(&self.inner),
723 ));
724 if allocation.is_new {
725 unsafe { buf.set_len(len) };
727 } else {
728 buf.put_bytes(0, len);
730 }
731 Ok(buf)
732 }
733
734 pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
753 self.try_alloc_zeroed(len).unwrap_or_else(|_| {
754 let size = len.max(self.inner.config.min_size.get());
756 let buffer = AlignedBuffer::new_zeroed(size, self.inner.config.alignment.get());
757 let mut buf = IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()));
758 unsafe { buf.set_len(len) };
760 buf
761 })
762 }
763
764 pub fn config(&self) -> &BufferPoolConfig {
766 &self.inner.config
767 }
768}
769
770struct PooledBufInner {
774 buffer: ManuallyDrop<AlignedBuffer>,
775 pool: Weak<BufferPoolInner>,
776}
777
778impl PooledBufInner {
779 const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
780 Self {
781 buffer: ManuallyDrop::new(buffer),
782 pool,
783 }
784 }
785
786 #[inline]
787 fn capacity(&self) -> usize {
788 self.buffer.capacity()
789 }
790}
791
792impl Drop for PooledBufInner {
793 fn drop(&mut self) {
794 let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
796 if let Some(pool) = self.pool.upgrade() {
797 pool.return_buffer(buffer);
798 }
799 }
801}
802
803#[derive(Clone)]
830pub(crate) struct PooledBuf {
831 inner: Arc<PooledBufInner>,
832 offset: usize,
833 len: usize,
834}
835
836impl std::fmt::Debug for PooledBuf {
837 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
838 f.debug_struct("PooledBuf")
839 .field("offset", &self.offset)
840 .field("len", &self.len)
841 .field("capacity", &self.inner.capacity())
842 .finish()
843 }
844}
845
846impl PooledBuf {
847 #[inline]
854 pub fn is_tracked(&self) -> bool {
855 self.inner.pool.strong_count() > 0
856 }
857
858 #[inline]
860 pub fn as_ptr(&self) -> *const u8 {
861 unsafe { self.inner.buffer.as_ptr().add(self.offset) }
863 }
864
865 pub fn slice(&self, range: impl RangeBounds<usize>) -> Option<Self> {
873 let start = match range.start_bound() {
874 Bound::Included(&n) => n,
875 Bound::Excluded(&n) => n.checked_add(1).expect("range start overflow"),
876 Bound::Unbounded => 0,
877 };
878 let end = match range.end_bound() {
879 Bound::Included(&n) => n.checked_add(1).expect("range end overflow"),
880 Bound::Excluded(&n) => n,
881 Bound::Unbounded => self.len,
882 };
883 assert!(start <= end, "slice start must be <= end");
884 assert!(end <= self.len, "slice out of bounds");
885
886 if start == end {
887 return None;
888 }
889
890 Some(Self {
891 inner: self.inner.clone(),
892 offset: self.offset + start,
893 len: end - start,
894 })
895 }
896
897 #[inline]
908 pub fn split_to(&mut self, at: usize) -> Self {
909 assert!(
910 at <= self.len,
911 "split_to out of bounds: {:?} <= {:?}",
912 at,
913 self.len,
914 );
915
916 let prefix = Self {
917 inner: self.inner.clone(),
918 offset: self.offset,
919 len: at,
920 };
921
922 self.offset += at;
923 self.len -= at;
924 prefix
925 }
926
927 pub fn try_into_mut(self) -> Result<PooledBufMut, Self> {
939 let Self { inner, offset, len } = self;
940 match Arc::try_unwrap(inner) {
941 Ok(inner) => Ok(PooledBufMut {
945 inner: ManuallyDrop::new(inner),
946 cursor: offset,
947 len: offset.checked_add(len).expect("slice end overflow"),
948 }),
949 Err(inner) => Err(Self { inner, offset, len }),
950 }
951 }
952
953 pub fn into_bytes(self) -> Bytes {
958 if self.len == 0 {
959 return Bytes::new();
960 }
961 Bytes::from_owner(self)
962 }
963}
964
965impl AsRef<[u8]> for PooledBuf {
966 #[inline]
967 fn as_ref(&self) -> &[u8] {
968 unsafe { std::slice::from_raw_parts(self.inner.buffer.as_ptr().add(self.offset), self.len) }
970 }
971}
972
973impl Buf for PooledBuf {
974 #[inline]
975 fn remaining(&self) -> usize {
976 self.len
977 }
978
979 #[inline]
980 fn chunk(&self) -> &[u8] {
981 self.as_ref()
982 }
983
984 #[inline]
985 fn advance(&mut self, cnt: usize) {
986 assert!(cnt <= self.len, "cannot advance past end of buffer");
987 self.offset += cnt;
988 self.len -= cnt;
989 }
990
991 #[inline]
992 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
993 assert!(len <= self.len, "copy_to_bytes out of bounds");
994 if len == 0 {
995 return Bytes::new();
996 }
997 let slice = Self {
998 inner: self.inner.clone(),
999 offset: self.offset,
1000 len,
1001 };
1002 self.advance(len);
1003 slice.into_bytes()
1004 }
1005}
1006
1007pub(crate) struct PooledBufMut {
1050 inner: ManuallyDrop<PooledBufInner>,
1051 cursor: usize,
1053 len: usize,
1055}
1056
1057impl std::fmt::Debug for PooledBufMut {
1058 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1059 f.debug_struct("PooledBufMut")
1060 .field("cursor", &self.cursor)
1061 .field("len", &self.len)
1062 .field("capacity", &self.capacity())
1063 .finish()
1064 }
1065}
1066
1067impl PooledBufMut {
1068 const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
1069 Self {
1070 inner: ManuallyDrop::new(PooledBufInner::new(buffer, pool)),
1071 cursor: 0,
1072 len: 0,
1073 }
1074 }
1075
1076 #[inline]
1083 pub fn is_tracked(&self) -> bool {
1084 self.inner.pool.strong_count() > 0
1085 }
1086
1087 #[inline]
1091 pub const fn len(&self) -> usize {
1092 self.len - self.cursor
1093 }
1094
1095 #[inline]
1097 pub const fn is_empty(&self) -> bool {
1098 self.cursor == self.len
1099 }
1100
1101 #[inline]
1103 pub fn capacity(&self) -> usize {
1104 self.inner.capacity() - self.cursor
1105 }
1106
1107 #[inline]
1109 fn raw_capacity(&self) -> usize {
1110 self.inner.capacity()
1111 }
1112
1113 #[inline]
1115 pub fn as_mut_ptr(&mut self) -> *mut u8 {
1116 unsafe { self.inner.buffer.as_ptr().add(self.cursor) }
1118 }
1119
1120 #[inline]
1135 pub const unsafe fn set_len(&mut self, len: usize) {
1136 self.len = self.cursor + len;
1137 }
1138
1139 #[inline]
1141 pub const fn clear(&mut self) {
1142 self.len = self.cursor;
1143 }
1144
1145 #[inline]
1151 pub const fn truncate(&mut self, len: usize) {
1152 if len < self.len() {
1153 self.len = self.cursor + len;
1154 }
1155 }
1156
1157 fn into_pooled(self) -> PooledBuf {
1159 let mut me = ManuallyDrop::new(self);
1162 let inner = unsafe { ManuallyDrop::take(&mut me.inner) };
1165 PooledBuf {
1166 inner: Arc::new(inner),
1167 offset: me.cursor,
1168 len: me.len - me.cursor,
1169 }
1170 }
1171
1172 pub fn freeze(self) -> IoBuf {
1178 IoBuf::from_pooled(self.into_pooled())
1179 }
1180
1181 pub fn into_bytes(self) -> Bytes {
1186 if self.is_empty() {
1187 return Bytes::new();
1188 }
1189 Bytes::from_owner(self.into_pooled())
1190 }
1191}
1192
1193impl AsRef<[u8]> for PooledBufMut {
1194 #[inline]
1195 fn as_ref(&self) -> &[u8] {
1196 unsafe {
1198 std::slice::from_raw_parts(self.inner.buffer.as_ptr().add(self.cursor), self.len())
1199 }
1200 }
1201}
1202
1203impl AsMut<[u8]> for PooledBufMut {
1204 #[inline]
1205 fn as_mut(&mut self) -> &mut [u8] {
1206 let len = self.len();
1207 unsafe { std::slice::from_raw_parts_mut(self.inner.buffer.as_ptr().add(self.cursor), len) }
1209 }
1210}
1211
1212impl Drop for PooledBufMut {
1213 fn drop(&mut self) {
1214 unsafe { ManuallyDrop::drop(&mut self.inner) };
1217 }
1218}
1219
1220impl Buf for PooledBufMut {
1221 #[inline]
1222 fn remaining(&self) -> usize {
1223 self.len - self.cursor
1224 }
1225
1226 #[inline]
1227 fn chunk(&self) -> &[u8] {
1228 unsafe {
1230 std::slice::from_raw_parts(
1231 self.inner.buffer.as_ptr().add(self.cursor),
1232 self.len - self.cursor,
1233 )
1234 }
1235 }
1236
1237 #[inline]
1238 fn advance(&mut self, cnt: usize) {
1239 let remaining = self.len - self.cursor;
1240 assert!(cnt <= remaining, "cannot advance past end of buffer");
1241 self.cursor += cnt;
1242 }
1243}
1244
1245unsafe impl BufMut for PooledBufMut {
1250 #[inline]
1251 fn remaining_mut(&self) -> usize {
1252 self.raw_capacity() - self.len
1253 }
1254
1255 #[inline]
1256 unsafe fn advance_mut(&mut self, cnt: usize) {
1257 assert!(
1258 cnt <= self.remaining_mut(),
1259 "cannot advance past end of buffer"
1260 );
1261 self.len += cnt;
1262 }
1263
1264 #[inline]
1265 fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
1266 let raw_cap = self.raw_capacity();
1267 let len = self.len;
1268 unsafe {
1270 let ptr = self.inner.buffer.as_ptr().add(len);
1271 bytes::buf::UninitSlice::from_raw_parts_mut(ptr, raw_cap - len)
1272 }
1273 }
1274}
1275
1276#[cfg(test)]
1277mod tests {
1278 use super::*;
1279 use bytes::BytesMut;
1280 use std::{sync::mpsc, thread};
1281
1282 fn test_registry() -> Registry {
1283 Registry::default()
1284 }
1285
1286 fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
1288 BufferPoolConfig {
1289 min_size: NZUsize!(min_size),
1290 max_size: NZUsize!(max_size),
1291 max_per_class: NZUsize!(max_per_class),
1292 prefill: false,
1293 alignment: NZUsize!(page_size()),
1294 }
1295 }
1296
1297 #[test]
1298 fn test_page_size() {
1299 let size = page_size();
1300 assert!(size >= 4096);
1301 assert!(size.is_power_of_two());
1302 }
1303
1304 #[test]
1305 fn test_aligned_buffer() {
1306 let page = page_size();
1307 let buf = AlignedBuffer::new(4096, page);
1308 assert_eq!(buf.capacity(), 4096);
1309 assert!((buf.as_ptr() as usize).is_multiple_of(page));
1310
1311 let cache_line = cache_line_size();
1313 let buf2 = AlignedBuffer::new(4096, cache_line);
1314 assert_eq!(buf2.capacity(), 4096);
1315 assert!((buf2.as_ptr() as usize).is_multiple_of(cache_line));
1316 }
1317
1318 #[test]
1319 #[should_panic(expected = "capacity must be greater than zero")]
1320 fn test_aligned_buffer_zero_capacity_panics() {
1321 let _ = AlignedBuffer::new(0, page_size());
1322 }
1323
1324 #[test]
1325 #[should_panic(expected = "capacity must be greater than zero")]
1326 fn test_aligned_buffer_zeroed_zero_capacity_panics() {
1327 let _ = AlignedBuffer::new_zeroed(0, page_size());
1328 }
1329
1330 #[test]
1331 fn test_config_validation() {
1332 let page = page_size();
1333 let config = test_config(page, page * 4, 10);
1334 config.validate();
1335 }
1336
1337 #[test]
1338 #[should_panic(expected = "min_size must be a power of two")]
1339 fn test_config_invalid_min_size() {
1340 let config = BufferPoolConfig {
1341 min_size: NZUsize!(3000),
1342 max_size: NZUsize!(8192),
1343 max_per_class: NZUsize!(10),
1344 prefill: false,
1345 alignment: NZUsize!(page_size()),
1346 };
1347 config.validate();
1348 }
1349
1350 #[test]
1351 fn test_config_class_index() {
1352 let page = page_size();
1353 let config = test_config(page, page * 8, 10);
1354
1355 assert_eq!(config.num_classes(), 4);
1357
1358 assert_eq!(config.class_index(1), Some(0));
1359 assert_eq!(config.class_index(page), Some(0));
1360 assert_eq!(config.class_index(page + 1), Some(1));
1361 assert_eq!(config.class_index(page * 2), Some(1));
1362 assert_eq!(config.class_index(page * 8), Some(3));
1363 assert_eq!(config.class_index(page * 8 + 1), None);
1364 }
1365
1366 #[test]
1367 fn test_pool_alloc_and_return() {
1368 let page = page_size();
1369 let mut registry = test_registry();
1370 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1371
1372 let buf = pool.try_alloc(100).unwrap();
1374 assert!(buf.capacity() >= page);
1375 assert_eq!(buf.len(), 0);
1376
1377 drop(buf);
1379
1380 let buf2 = pool.try_alloc(100).unwrap();
1382 assert!(buf2.capacity() >= page);
1383 assert_eq!(buf2.len(), 0);
1384 }
1385
1386 #[test]
1387 fn test_alloc_len_sets_len() {
1388 let page = page_size();
1389 let mut registry = test_registry();
1390 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1391
1392 let mut buf = unsafe { pool.alloc_len(100) };
1394 assert_eq!(buf.len(), 100);
1395 buf.as_mut().fill(0xAB);
1396 let frozen = buf.freeze();
1397 assert_eq!(frozen.as_ref(), &[0xAB; 100]);
1398 }
1399
1400 #[test]
1401 fn test_alloc_zeroed_sets_len_and_zeros() {
1402 let page = page_size();
1403 let mut registry = test_registry();
1404 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1405
1406 let buf = pool.alloc_zeroed(100);
1407 assert_eq!(buf.len(), 100);
1408 assert!(buf.as_ref().iter().all(|&b| b == 0));
1409 }
1410
1411 #[test]
1412 fn test_try_alloc_zeroed_sets_len_and_zeros() {
1413 let page = page_size();
1414 let mut registry = test_registry();
1415 let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1416
1417 let buf = pool.try_alloc_zeroed(100).unwrap();
1418 assert!(buf.is_pooled());
1419 assert_eq!(buf.len(), 100);
1420 assert!(buf.as_ref().iter().all(|&b| b == 0));
1421 }
1422
1423 #[test]
1424 fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
1425 let page = page_size();
1426 let mut registry = test_registry();
1427 let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1428
1429 let _pooled = pool.try_alloc(100).unwrap();
1431
1432 let buf = pool.alloc_zeroed(100);
1433 assert!(!buf.is_pooled());
1434 assert_eq!(buf.len(), 100);
1435 assert!(buf.as_ref().iter().all(|&b| b == 0));
1436 }
1437
1438 #[test]
1439 fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
1440 let page = page_size();
1441 let mut registry = test_registry();
1442 let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1443
1444 let mut first = pool.alloc_zeroed(100);
1445 assert!(first.is_pooled());
1446 assert!(first.as_ref().iter().all(|&b| b == 0));
1447
1448 first.as_mut().fill(0xAB);
1450 drop(first);
1451
1452 let second = pool.alloc_zeroed(100);
1453 assert!(second.is_pooled());
1454 assert_eq!(second.len(), 100);
1455 assert!(second.as_ref().iter().all(|&b| b == 0));
1456 }
1457
1458 #[test]
1459 fn test_pool_exhaustion() {
1460 let page = page_size();
1461 let mut registry = test_registry();
1462 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1463
1464 let _buf1 = pool.try_alloc(100).expect("first alloc should succeed");
1466 let _buf2 = pool.try_alloc(100).expect("second alloc should succeed");
1467
1468 assert!(pool.try_alloc(100).is_err());
1470 }
1471
1472 #[test]
1473 fn test_pool_oversized() {
1474 let page = page_size();
1475 let mut registry = test_registry();
1476 let pool = BufferPool::new(test_config(page, page * 2, 10), &mut registry);
1477
1478 assert!(pool.try_alloc(page * 4).is_err());
1480 }
1481
1482 #[test]
1483 fn test_pool_size_classes() {
1484 let page = page_size();
1485 let mut registry = test_registry();
1486 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1487
1488 let buf1 = pool.try_alloc(100).unwrap();
1490 assert_eq!(buf1.capacity(), page);
1491
1492 let buf2 = pool.try_alloc(page + 1).unwrap();
1494 assert_eq!(buf2.capacity(), page * 2);
1495
1496 let buf3 = pool.try_alloc(page * 3).unwrap();
1497 assert_eq!(buf3.capacity(), page * 4);
1498 }
1499
1500 #[test]
1501 fn test_pooled_buf_mut_freeze() {
1502 let page = page_size();
1503 let mut registry = test_registry();
1504 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1505
1506 let mut buf = pool.try_alloc(11).unwrap();
1508 buf.put_slice(&[0u8; 11]);
1509 assert_eq!(buf.len(), 11);
1510
1511 buf.as_mut()[..5].copy_from_slice(&[1, 2, 3, 4, 5]);
1513
1514 let iobuf = buf.freeze();
1516 assert_eq!(iobuf.len(), 11);
1517 assert_eq!(&iobuf.as_ref()[..5], &[1, 2, 3, 4, 5]);
1518
1519 let slice = iobuf.slice(0..5);
1521 assert_eq!(slice.len(), 5);
1522 }
1523
1524 #[test]
1525 fn test_prefill() {
1526 let page = NZUsize!(page_size());
1527 let mut registry = test_registry();
1528 let pool = BufferPool::new(
1529 BufferPoolConfig {
1530 min_size: page,
1531 max_size: page,
1532 max_per_class: NZUsize!(5),
1533 prefill: true,
1534 alignment: page,
1535 },
1536 &mut registry,
1537 );
1538
1539 let mut bufs = Vec::new();
1541 for _ in 0..5 {
1542 bufs.push(pool.try_alloc(100).expect("alloc should succeed"));
1543 }
1544
1545 assert!(pool.try_alloc(100).is_err());
1547 }
1548
1549 #[test]
1550 fn test_config_for_network() {
1551 let config = BufferPoolConfig::for_network();
1552 config.validate();
1553 assert_eq!(config.min_size.get(), cache_line_size());
1554 assert_eq!(config.max_size.get(), 64 * 1024);
1555 assert_eq!(config.max_per_class.get(), 4096);
1556 assert!(!config.prefill);
1557 assert_eq!(config.alignment.get(), cache_line_size());
1558 }
1559
1560 #[test]
1561 fn test_config_for_storage() {
1562 let config = BufferPoolConfig::for_storage();
1563 config.validate();
1564 assert_eq!(config.min_size.get(), page_size());
1565 assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
1566 assert_eq!(config.max_per_class.get(), 32);
1567 assert!(!config.prefill);
1568 assert_eq!(config.alignment.get(), page_size());
1569 }
1570
1571 #[test]
1572 fn test_storage_config_supports_default_allocations() {
1573 let mut registry = test_registry();
1574 let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
1575
1576 let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
1577 assert_eq!(buf.capacity(), 8 * 1024 * 1024);
1578 }
1579
1580 #[test]
1581 fn test_config_builders() {
1582 let page = NZUsize!(page_size());
1583 let config = BufferPoolConfig::for_storage()
1584 .with_max_per_class(NZUsize!(64))
1585 .with_prefill(true)
1586 .with_min_size(page)
1587 .with_max_size(NZUsize!(128 * 1024));
1588
1589 config.validate();
1590 assert_eq!(config.min_size, page);
1591 assert_eq!(config.max_size.get(), 128 * 1024);
1592 assert_eq!(config.max_per_class.get(), 64);
1593 assert!(config.prefill);
1594
1595 assert_eq!(config.alignment.get(), page_size());
1597
1598 let aligned = BufferPoolConfig::for_network()
1600 .with_alignment(NZUsize!(256))
1601 .with_min_size(NZUsize!(256));
1602 aligned.validate();
1603 assert_eq!(aligned.alignment.get(), 256);
1604 assert_eq!(aligned.min_size.get(), 256);
1605 }
1606
1607 #[test]
1608 fn test_config_with_budget_bytes() {
1609 let config = BufferPoolConfig {
1611 min_size: NZUsize!(4),
1612 max_size: NZUsize!(16),
1613 max_per_class: NZUsize!(1),
1614 prefill: false,
1615 alignment: NZUsize!(4),
1616 }
1617 .with_budget_bytes(NZUsize!(280));
1618 assert_eq!(config.max_per_class.get(), 10);
1619
1620 let small_budget = BufferPoolConfig {
1622 min_size: NZUsize!(4),
1623 max_size: NZUsize!(16),
1624 max_per_class: NZUsize!(1),
1625 prefill: false,
1626 alignment: NZUsize!(4),
1627 }
1628 .with_budget_bytes(NZUsize!(10));
1629 assert_eq!(small_budget.max_per_class.get(), 1);
1630 }
1631
1632 #[test]
1633 fn test_pool_error_display() {
1634 assert_eq!(
1635 PoolError::Oversized.to_string(),
1636 "requested capacity exceeds maximum buffer size"
1637 );
1638 assert_eq!(
1639 PoolError::Exhausted.to_string(),
1640 "pool exhausted for required size class"
1641 );
1642 }
1643
1644 #[test]
1645 fn test_config_invalid_range_edge_paths() {
1646 let invalid_order = BufferPoolConfig {
1647 min_size: NZUsize!(8),
1648 max_size: NZUsize!(4),
1649 max_per_class: NZUsize!(1),
1650 prefill: false,
1651 alignment: NZUsize!(4),
1652 };
1653 assert_eq!(invalid_order.num_classes(), 0);
1654 let unchanged = invalid_order.clone().with_budget_bytes(NZUsize!(128));
1655 assert_eq!(unchanged.max_per_class, invalid_order.max_per_class);
1656
1657 let non_power_two_max = BufferPoolConfig {
1658 min_size: NZUsize!(8),
1659 max_size: NZUsize!(12),
1660 max_per_class: NZUsize!(1),
1661 prefill: false,
1662 alignment: NZUsize!(4),
1663 };
1664 assert_eq!(non_power_two_max.class_index(12), None);
1665 }
1666
1667 #[test]
1668 fn test_pool_debug_and_config_accessor() {
1669 let page = page_size();
1670 let mut registry = test_registry();
1671 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1672
1673 let debug = format!("{pool:?}");
1674 assert!(debug.contains("BufferPool"));
1675 assert!(debug.contains("num_classes"));
1676 assert_eq!(pool.config().min_size.get(), page);
1677 }
1678
1679 #[test]
1680 fn test_return_buffer_freelist_full_drops_extra() {
1681 let page = page_size();
1682 let mut registry = test_registry();
1683 let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1684
1685 let tracked = pool.try_alloc(page).expect("tracked allocation");
1687 drop(tracked);
1688
1689 let class_index = pool
1692 .inner
1693 .config
1694 .class_index(page)
1695 .expect("class exists for page-sized buffer");
1696 pool.inner.classes[class_index]
1697 .allocated
1698 .store(1, Ordering::Relaxed);
1699 pool.inner
1700 .return_buffer(AlignedBuffer::new(page, page_size()));
1701 assert_eq!(
1702 pool.inner.classes[class_index]
1703 .allocated
1704 .load(Ordering::Relaxed),
1705 0
1706 );
1707 }
1708
1709 #[test]
1710 fn test_return_buffer_ignores_unmatched_class() {
1711 let page = page_size();
1712 let mut registry = test_registry();
1713 let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1714
1715 pool.inner
1717 .return_buffer(AlignedBuffer::new(page * 2, page_size()));
1718 assert_eq!(get_allocated(&pool, page), 0);
1719 }
1720
1721 #[test]
1722 fn test_pooled_debug_and_empty_into_bytes_paths() {
1723 let page = page_size();
1724
1725 let pooled_mut_debug = {
1726 let pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
1727 format!("{pooled_mut:?}")
1728 };
1729 assert!(pooled_mut_debug.contains("PooledBufMut"));
1730 assert!(pooled_mut_debug.contains("cursor"));
1731
1732 let empty_from_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
1733 assert!(empty_from_mut.into_bytes().is_empty());
1734
1735 let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new()).into_pooled();
1736 let pooled_debug = format!("{pooled:?}");
1737 assert!(pooled_debug.contains("PooledBuf"));
1738 assert!(pooled_debug.contains("capacity"));
1739 assert!(pooled.into_bytes().is_empty());
1740 }
1741
1742 #[test]
1743 #[should_panic(expected = "range start overflow")]
1744 fn test_pooled_slice_excluded_start_overflow() {
1745 let page = page_size();
1746 let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new()).into_pooled();
1747 let _ = pooled.slice((Bound::Excluded(usize::MAX), Bound::<usize>::Unbounded));
1748 }
1749
1750 fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1752 let class_index = pool.inner.config.class_index(size).unwrap();
1753 pool.inner.classes[class_index]
1754 .allocated
1755 .load(Ordering::Relaxed)
1756 }
1757
1758 fn get_available(pool: &BufferPool, size: usize) -> i64 {
1760 let class_index = pool.inner.config.class_index(size).unwrap();
1761 let label = SizeClassLabel {
1762 size_class: pool.inner.classes[class_index].size as u64,
1763 };
1764 pool.inner.metrics.available.get_or_create(&label).get()
1765 }
1766
1767 #[test]
1768 fn test_freeze_returns_buffer_to_pool() {
1769 let page = page_size();
1770 let mut registry = test_registry();
1771 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1772
1773 assert_eq!(get_allocated(&pool, page), 0);
1775 assert_eq!(get_available(&pool, page), 0);
1776
1777 let buf = pool.try_alloc(100).unwrap();
1779 assert_eq!(get_allocated(&pool, page), 1);
1780 assert_eq!(get_available(&pool, page), 0);
1781
1782 let iobuf = buf.freeze();
1783 assert_eq!(get_allocated(&pool, page), 1);
1785
1786 drop(iobuf);
1788 assert_eq!(get_allocated(&pool, page), 0);
1789 assert_eq!(get_available(&pool, page), 1);
1790 }
1791
1792 #[test]
1793 fn test_refcount_and_copy_to_bytes_paths() {
1794 let page = page_size();
1795 let mut registry = test_registry();
1796 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1797
1798 {
1802 let mut buf = pool.try_alloc(100).unwrap();
1803 buf.put_slice(&[0xAA; 100]);
1804 let iobuf = buf.freeze();
1805 let clone = iobuf.clone();
1806 let slice = iobuf.slice(10..40);
1807 let empty = iobuf.slice(10..10);
1808 assert!(empty.is_empty());
1809 drop(iobuf);
1810 assert_eq!(get_allocated(&pool, page), 1);
1811 drop(slice);
1812 assert_eq!(get_allocated(&pool, page), 1);
1813 drop(clone);
1814 assert_eq!(get_allocated(&pool, page), 0);
1815 }
1816
1817 {
1823 let mut buf = pool.try_alloc(100).unwrap();
1824 buf.put_slice(&[0x42; 100]);
1825 let mut iobuf = buf.freeze();
1826
1827 let zero = iobuf.copy_to_bytes(0);
1828 assert!(zero.is_empty());
1829 assert_eq!(iobuf.remaining(), 100);
1830
1831 let partial = iobuf.copy_to_bytes(30);
1832 assert_eq!(&partial[..], &[0x42; 30]);
1833 assert_eq!(iobuf.remaining(), 70);
1834
1835 let rest = iobuf.copy_to_bytes(70);
1836 assert_eq!(&rest[..], &[0x42; 70]);
1837 assert_eq!(iobuf.remaining(), 0);
1838
1839 let empty = iobuf.copy_to_bytes(0);
1841 assert!(empty.is_empty());
1842
1843 drop(iobuf);
1844 assert_eq!(get_allocated(&pool, page), 1);
1845 drop(zero);
1846 drop(partial);
1847 assert_eq!(get_allocated(&pool, page), 1);
1848 drop(rest);
1849 assert_eq!(get_allocated(&pool, page), 0);
1850 }
1851
1852 {
1854 let buf = pool.try_alloc(100).unwrap();
1855 let mut iobufmut = buf;
1856 iobufmut.put_slice(&[0x7E; 100]);
1857
1858 let zero = iobufmut.copy_to_bytes(0);
1859 assert!(zero.is_empty());
1860 assert_eq!(iobufmut.remaining(), 100);
1861
1862 let partial = iobufmut.copy_to_bytes(30);
1863 assert_eq!(&partial[..], &[0x7E; 30]);
1864 assert_eq!(iobufmut.remaining(), 70);
1865
1866 let rest = iobufmut.copy_to_bytes(70);
1867 assert_eq!(&rest[..], &[0x7E; 70]);
1868 assert_eq!(iobufmut.remaining(), 0);
1869
1870 drop(iobufmut);
1871 assert_eq!(get_allocated(&pool, page), 1);
1872 drop(zero);
1873 drop(partial);
1874 assert_eq!(get_allocated(&pool, page), 1);
1875 drop(rest);
1876 assert_eq!(get_allocated(&pool, page), 0);
1877 }
1878 }
1879
1880 #[test]
1881 fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
1882 let page = page_size();
1884 let mut registry = test_registry();
1885 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1886
1887 let buf = pool.try_alloc(100).unwrap();
1888 assert_eq!(get_allocated(&pool, page), 1);
1889
1890 let iobuf = buf.freeze();
1891 assert_eq!(get_allocated(&pool, page), 1);
1892
1893 let iobufmut: IoBufMut = iobuf.into();
1894
1895 assert_eq!(
1897 get_allocated(&pool, page),
1898 1,
1899 "pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
1900 );
1901 assert_eq!(get_available(&pool, page), 0);
1902
1903 drop(iobufmut);
1905 assert_eq!(get_allocated(&pool, page), 0);
1906 assert_eq!(get_available(&pool, page), 1);
1907 }
1908
1909 #[test]
1910 fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
1911 let page = page_size();
1912 let mut registry = test_registry();
1913 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1914
1915 let mut buf = pool.try_alloc(page).unwrap();
1916 buf.put_slice(&vec![0xEE; page]);
1917 let iobuf = buf.freeze();
1918
1919 let iobufmut: IoBufMut = iobuf.into();
1920 assert_eq!(iobufmut.len(), page);
1921 assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
1922 assert_eq!(get_allocated(&pool, page), 1);
1923 assert_eq!(get_available(&pool, page), 0);
1924
1925 drop(iobufmut);
1926 assert_eq!(get_allocated(&pool, page), 0);
1927 assert_eq!(get_available(&pool, page), 1);
1928 }
1929
1930 #[test]
1931 fn test_iobuf_try_into_mut_recycles_full_unique_view() {
1932 let page = page_size();
1933 let mut registry = test_registry();
1934 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1935
1936 let mut buf = pool.try_alloc(page).unwrap();
1937 buf.put_slice(&vec![0xAB; page]);
1938 let iobuf = buf.freeze();
1939 assert_eq!(get_allocated(&pool, page), 1);
1940
1941 let recycled = iobuf
1942 .try_into_mut()
1943 .expect("unique full-view pooled buffer should recycle");
1944 assert_eq!(recycled.len(), page);
1945 assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
1946 assert_eq!(recycled.capacity(), page);
1947 assert_eq!(get_allocated(&pool, page), 1);
1948
1949 drop(recycled);
1950 assert_eq!(get_allocated(&pool, page), 0);
1951 assert_eq!(get_available(&pool, page), 1);
1952 }
1953
1954 #[test]
1955 fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
1956 let page = page_size();
1957 let mut registry = test_registry();
1958 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1959
1960 let mut buf = pool.try_alloc(page).unwrap();
1962 buf.put_slice(&vec![0xCD; page]);
1963 let iobuf = buf.freeze();
1964 let sliced = iobuf.slice(1..page);
1965 drop(iobuf);
1966 let recycled = sliced
1967 .try_into_mut()
1968 .expect("unique sliced pooled buffer should recycle");
1969 assert_eq!(recycled.len(), page - 1);
1970 assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
1971 assert_eq!(recycled.capacity(), page - 1);
1972 assert_eq!(get_allocated(&pool, page), 1);
1973 drop(recycled);
1974 assert_eq!(get_allocated(&pool, page), 0);
1975 assert_eq!(get_available(&pool, page), 1);
1976
1977 let mut buf = pool.try_alloc(page).unwrap();
1979 buf.put_slice(&vec![0xEF; page]);
1980 let iobuf = buf.freeze();
1981 let cloned = iobuf.clone();
1982 let iobuf = iobuf
1983 .try_into_mut()
1984 .expect_err("shared pooled buffer must not convert to mutable");
1985
1986 drop(cloned);
1987 drop(iobuf);
1988 assert_eq!(get_allocated(&pool, page), 0);
1989 assert!(get_available(&pool, page) >= 1);
1990 }
1991
1992 #[test]
1993 fn test_multithreaded_alloc_freeze_return() {
1994 let page = page_size();
1995 let mut registry = test_registry();
1996 let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
1997
1998 let mut handles = vec![];
1999
2000 cfg_if::cfg_if! {
2002 if #[cfg(miri)] {
2003 let iterations = 100;
2004 } else {
2005 let iterations = 1000;
2006 }
2007 }
2008
2009 for _ in 0..10 {
2011 let pool = pool.clone();
2012 let handle = thread::spawn(move || {
2013 for _ in 0..iterations {
2014 let buf = pool.try_alloc(100).unwrap();
2015 let iobuf = buf.freeze();
2016
2017 let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
2019 drop(iobuf);
2020
2021 for clone in clones {
2023 drop(clone);
2024 }
2025 }
2026 });
2027 handles.push(handle);
2028 }
2029
2030 for handle in handles {
2032 handle.join().unwrap();
2033 }
2034
2035 let class_index = pool.inner.config.class_index(page).unwrap();
2037 let allocated = pool.inner.classes[class_index]
2038 .allocated
2039 .load(Ordering::Relaxed);
2040 assert_eq!(
2041 allocated, 0,
2042 "all buffers should be returned after multithreaded test"
2043 );
2044 }
2045
2046 #[test]
2047 fn test_cross_thread_buffer_return() {
2048 let page = page_size();
2050 let mut registry = test_registry();
2051 let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
2052
2053 let (tx, rx) = mpsc::channel();
2054
2055 for _ in 0..50 {
2057 let buf = pool.try_alloc(100).unwrap();
2058 let iobuf = buf.freeze();
2059 tx.send(iobuf).unwrap();
2060 }
2061 drop(tx);
2062
2063 let handle = thread::spawn(move || {
2065 while let Ok(iobuf) = rx.recv() {
2066 drop(iobuf);
2067 }
2068 });
2069
2070 handle.join().unwrap();
2071
2072 assert_eq!(get_allocated(&pool, page), 0);
2074 }
2075
2076 #[test]
2077 fn test_pool_dropped_before_buffer() {
2078 let page = page_size();
2082 let mut registry = test_registry();
2083 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2084
2085 let mut buf = pool.try_alloc(100).unwrap();
2086 buf.put_slice(&[0u8; 100]);
2087 let iobuf = buf.freeze();
2088
2089 drop(pool);
2091
2092 assert_eq!(iobuf.len(), 100);
2094
2095 drop(iobuf);
2097 }
2099
2100 #[test]
2102 fn test_bytes_parity_iobuf_buf_trait() {
2103 let page = page_size();
2104 let mut registry = test_registry();
2105 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2106
2107 let data: Vec<u8> = (0..100u8).collect();
2108
2109 let mut pooled_mut = pool.try_alloc(data.len()).unwrap();
2110 pooled_mut.put_slice(&data);
2111 let mut pooled = pooled_mut.freeze();
2112 let mut bytes = Bytes::from(data);
2113
2114 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2116 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2117
2118 Buf::advance(&mut bytes, 13);
2120 Buf::advance(&mut pooled, 13);
2121 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2122 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2123
2124 let bytes_zero = Buf::copy_to_bytes(&mut bytes, 0);
2126 let pooled_zero = Buf::copy_to_bytes(&mut pooled, 0);
2127 assert_eq!(bytes_zero, pooled_zero);
2128 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2129 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2130
2131 let bytes_mid = Buf::copy_to_bytes(&mut bytes, 17);
2133 let pooled_mid = Buf::copy_to_bytes(&mut pooled, 17);
2134 assert_eq!(bytes_mid, pooled_mid);
2135 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2136 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2137
2138 let remaining = Buf::remaining(&bytes);
2140 let bytes_rest = Buf::copy_to_bytes(&mut bytes, remaining);
2141 let pooled_rest = Buf::copy_to_bytes(&mut pooled, remaining);
2142 assert_eq!(bytes_rest, pooled_rest);
2143 assert_eq!(Buf::remaining(&bytes), 0);
2144 assert_eq!(Buf::remaining(&pooled), 0);
2145 assert!(!Buf::has_remaining(&bytes));
2146 assert!(!Buf::has_remaining(&pooled));
2147 }
2148
2149 #[test]
2151 fn test_bytes_parity_iobuf_slice() {
2152 let page = page_size();
2153 let mut registry = test_registry();
2154 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2155
2156 let data: Vec<u8> = (0..32u8).collect();
2157 let mut pooled_mut = pool.try_alloc(data.len()).unwrap();
2158 pooled_mut.put_slice(&data);
2159 let pooled = pooled_mut.freeze();
2160 let bytes = Bytes::from(data);
2161
2162 assert_eq!(pooled.slice(..5).as_ref(), bytes.slice(..5).as_ref());
2163 assert_eq!(pooled.slice(6..).as_ref(), bytes.slice(6..).as_ref());
2164 assert_eq!(pooled.slice(3..8).as_ref(), bytes.slice(3..8).as_ref());
2165 assert_eq!(pooled.slice(..=7).as_ref(), bytes.slice(..=7).as_ref());
2166 assert_eq!(pooled.slice(10..10).as_ref(), bytes.slice(10..10).as_ref());
2167 }
2168
2169 #[test]
2170 fn test_bytes_parity_iobuf_split_to() {
2171 let page = page_size();
2172 let mut pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
2173 pooled_mut.put_slice(b"abcdefgh");
2174 let mut pooled = pooled_mut.into_pooled();
2175 let mut bytes = Bytes::from_static(b"abcdefgh");
2176
2177 assert_eq!(pooled.split_to(0).as_ref(), bytes.split_to(0).as_ref());
2179 assert_eq!(pooled.as_ref(), bytes.as_ref());
2180
2181 assert_eq!(pooled.split_to(3).as_ref(), bytes.split_to(3).as_ref());
2183 assert_eq!(pooled.as_ref(), bytes.as_ref());
2184
2185 let remaining = bytes.remaining();
2187 assert_eq!(
2188 pooled.split_to(remaining).as_ref(),
2189 bytes.split_to(remaining).as_ref()
2190 );
2191 assert_eq!(pooled.as_ref(), bytes.as_ref());
2192 }
2193
2194 #[test]
2195 #[should_panic(expected = "split_to out of bounds")]
2196 fn test_iobuf_split_to_out_of_bounds() {
2197 let page = page_size();
2198 let mut pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
2199 pooled_mut.put_slice(b"abc");
2200 let mut pooled = pooled_mut.into_pooled();
2201 let _ = pooled.split_to(4);
2202 }
2203
2204 #[test]
2206 fn test_bytesmut_parity_buf_trait() {
2207 let page = page_size();
2208 let mut registry = test_registry();
2209 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2210
2211 let mut bytes = BytesMut::with_capacity(100);
2212 bytes.put_slice(&[0xAAu8; 50]);
2213
2214 let mut pooled = pool.try_alloc(100).unwrap();
2215 pooled.put_slice(&[0xAAu8; 50]);
2216
2217 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2219
2220 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2222
2223 Buf::advance(&mut bytes, 10);
2225 Buf::advance(&mut pooled, 10);
2226 assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2227 assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2228
2229 let remaining = Buf::remaining(&bytes);
2231 Buf::advance(&mut bytes, remaining);
2232 Buf::advance(&mut pooled, remaining);
2233 assert_eq!(Buf::remaining(&bytes), 0);
2234 assert_eq!(Buf::remaining(&pooled), 0);
2235 assert!(!Buf::has_remaining(&bytes));
2236 assert!(!Buf::has_remaining(&pooled));
2237 }
2238
2239 #[test]
2241 fn test_bytesmut_parity_bufmut_trait() {
2242 let page = page_size();
2243 let mut registry = test_registry();
2244 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2245
2246 let mut bytes = BytesMut::with_capacity(100);
2247 let mut pooled = pool.try_alloc(100).unwrap();
2248
2249 assert!(BufMut::remaining_mut(&bytes) >= 100);
2251 assert!(BufMut::remaining_mut(&pooled) >= 100);
2252
2253 BufMut::put_slice(&mut bytes, b"hello");
2255 BufMut::put_slice(&mut pooled, b"hello");
2256 assert_eq!(bytes.as_ref(), pooled.as_ref());
2257
2258 BufMut::put_u8(&mut bytes, 0x42);
2260 BufMut::put_u8(&mut pooled, 0x42);
2261 assert_eq!(bytes.as_ref(), pooled.as_ref());
2262
2263 let bytes_chunk = BufMut::chunk_mut(&mut bytes);
2265 let pooled_chunk = BufMut::chunk_mut(&mut pooled);
2266 assert!(bytes_chunk.len() > 0);
2267 assert!(pooled_chunk.len() > 0);
2268 }
2269
2270 #[test]
2271 fn test_bytesmut_parity_after_advance_paths() {
2272 let page = page_size();
2273 let mut registry = test_registry();
2274 let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
2275
2276 {
2278 let mut bytes = BytesMut::with_capacity(100);
2279 bytes.put_slice(&[0xAAu8; 50]);
2280 Buf::advance(&mut bytes, 10);
2281 let mut pooled = pool.try_alloc(100).unwrap();
2282 pooled.put_slice(&[0xAAu8; 50]);
2283 Buf::advance(&mut pooled, 10);
2284 bytes.truncate(20);
2285 pooled.truncate(20);
2286 assert_eq!(bytes.as_ref(), pooled.as_ref());
2287 }
2288
2289 {
2291 let mut bytes = BytesMut::with_capacity(100);
2292 bytes.put_slice(&[0xAAu8; 50]);
2293 Buf::advance(&mut bytes, 10);
2294 let mut pooled = pool.try_alloc(100).unwrap();
2295 pooled.put_slice(&[0xAAu8; 50]);
2296 Buf::advance(&mut pooled, 10);
2297 bytes.clear();
2298 pooled.clear();
2299 assert_eq!(bytes.len(), 0);
2300 assert_eq!(pooled.len(), 0);
2301 }
2302
2303 {
2305 let mut bytes = BytesMut::with_capacity(page);
2306 bytes.resize(50, 0xBB);
2307 Buf::advance(&mut bytes, 20);
2308 let mut pooled = pool.try_alloc(page).unwrap();
2309 pooled.put_slice(&[0xBB; 50]);
2310 Buf::advance(&mut pooled, 20);
2311 assert_eq!(bytes.capacity(), pooled.capacity());
2312 unsafe {
2314 bytes.set_len(25);
2315 pooled.set_len(25);
2316 }
2317 assert_eq!(bytes.as_ref(), pooled.as_ref());
2318 let bytes_cap = bytes.capacity();
2319 let pooled_cap = pooled.capacity();
2320 bytes.clear();
2321 pooled.clear();
2322 assert_eq!(bytes.capacity(), bytes_cap);
2323 assert_eq!(pooled.capacity(), pooled_cap);
2324 }
2325
2326 {
2328 let mut bytes = BytesMut::with_capacity(100);
2329 bytes.resize(30, 0xAA);
2330 Buf::advance(&mut bytes, 10);
2331 bytes.put_slice(&[0xBB; 10]);
2332 bytes.truncate(100);
2333
2334 let mut pooled = pool.try_alloc(100).unwrap();
2335 pooled.put_slice(&[0xAA; 30]);
2336 Buf::advance(&mut pooled, 10);
2337 pooled.put_slice(&[0xBB; 10]);
2338 pooled.truncate(100);
2339 assert_eq!(bytes.as_ref(), pooled.as_ref());
2340 }
2341 }
2342
2343 #[test]
2345 fn test_pool_exhaustion_and_recovery() {
2346 let page = page_size();
2347 let mut registry = test_registry();
2348 let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
2349
2350 let buf1 = pool.try_alloc(100).expect("first alloc");
2352 let buf2 = pool.try_alloc(100).expect("second alloc");
2353 let buf3 = pool.try_alloc(100).expect("third alloc");
2354 assert!(pool.try_alloc(100).is_err(), "pool should be exhausted");
2355
2356 drop(buf1);
2358
2359 let buf4 = pool.try_alloc(100).expect("alloc after return");
2361 assert!(pool.try_alloc(100).is_err(), "pool exhausted again");
2362
2363 drop(buf2);
2365 drop(buf3);
2366 drop(buf4);
2367
2368 assert_eq!(get_allocated(&pool, page), 0);
2369 assert_eq!(get_available(&pool, page), 3);
2370
2371 let _buf5 = pool.try_alloc(100).expect("reuse from freelist");
2373 assert_eq!(get_available(&pool, page), 2);
2374 }
2375
2376 #[test]
2378 fn test_try_alloc_errors() {
2379 let page = page_size();
2380 let mut registry = test_registry();
2381 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2382
2383 let result = pool.try_alloc(page * 10);
2385 assert_eq!(result.unwrap_err(), PoolError::Oversized);
2386
2387 let _buf1 = pool.try_alloc(100).unwrap();
2389 let _buf2 = pool.try_alloc(100).unwrap();
2390 let result = pool.try_alloc(100);
2391 assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2392 }
2393
2394 #[test]
2395 fn test_try_alloc_zeroed_errors() {
2396 let page = page_size();
2397 let mut registry = test_registry();
2398 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2399
2400 let result = pool.try_alloc_zeroed(page * 10);
2402 assert_eq!(result.unwrap_err(), PoolError::Oversized);
2403
2404 let _buf1 = pool.try_alloc_zeroed(100).unwrap();
2406 let _buf2 = pool.try_alloc_zeroed(100).unwrap();
2407 let result = pool.try_alloc_zeroed(100);
2408 assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2409 }
2410
2411 #[test]
2413 fn test_fallback_allocation() {
2414 let page = page_size();
2415 let mut registry = test_registry();
2416 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2417
2418 let buf1 = pool.try_alloc(100).unwrap();
2420 let buf2 = pool.try_alloc(100).unwrap();
2421 assert!(buf1.is_pooled());
2422 assert!(buf2.is_pooled());
2423
2424 let mut fallback_exhausted = pool.alloc(100);
2426 assert!(!fallback_exhausted.is_pooled());
2427 assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
2428
2429 let mut fallback_oversized = pool.alloc(page * 10);
2431 assert!(!fallback_oversized.is_pooled());
2432 assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
2433
2434 assert_eq!(get_allocated(&pool, page), 2);
2436
2437 drop(fallback_exhausted);
2439 drop(fallback_oversized);
2440 assert_eq!(get_allocated(&pool, page), 2);
2441
2442 drop(buf1);
2444 drop(buf2);
2445 assert_eq!(get_allocated(&pool, page), 0);
2446 }
2447
2448 #[test]
2450 fn test_is_pooled() {
2451 let page = page_size();
2452 let mut registry = test_registry();
2453 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2454
2455 let pooled = pool.try_alloc(100).unwrap();
2456 assert!(pooled.is_pooled());
2457
2458 let owned = IoBufMut::with_capacity(100);
2459 assert!(!owned.is_pooled());
2460 }
2461
2462 #[test]
2463 fn test_iobuf_is_pooled() {
2464 let page = page_size();
2465 let mut registry = test_registry();
2466 let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2467
2468 let pooled = pool.try_alloc(100).unwrap().freeze();
2469 assert!(pooled.is_pooled());
2470
2471 let fallback = pool.alloc(page * 10).freeze();
2473 assert!(!fallback.is_pooled());
2474
2475 let bytes = IoBuf::copy_from_slice(b"hello");
2476 assert!(!bytes.is_pooled());
2477 }
2478
2479 #[test]
2480 fn test_buffer_alignment() {
2481 let page = page_size();
2482 let cache_line = cache_line_size();
2483 let mut registry = test_registry();
2484
2485 cfg_if::cfg_if! {
2487 if #[cfg(miri)] {
2488 let storage_config = BufferPoolConfig {
2489 max_per_class: NZUsize!(32),
2490 ..BufferPoolConfig::for_storage()
2491 };
2492 let network_config = BufferPoolConfig {
2493 max_per_class: NZUsize!(32),
2494 ..BufferPoolConfig::for_network()
2495 };
2496 } else {
2497 let storage_config = BufferPoolConfig::for_storage();
2498 let network_config = BufferPoolConfig::for_network();
2499 }
2500 }
2501
2502 let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
2504 let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
2505 assert_eq!(
2506 buf.as_mut_ptr() as usize % page,
2507 0,
2508 "storage buffer not page-aligned"
2509 );
2510
2511 let network_buffer_pool = BufferPool::new(network_config, &mut registry);
2513 let mut buf = network_buffer_pool.try_alloc(100).unwrap();
2514 assert_eq!(
2515 buf.as_mut_ptr() as usize % cache_line,
2516 0,
2517 "network buffer not cache-line aligned"
2518 );
2519 }
2520
2521 #[test]
2522 fn test_alloc_and_freeze_view_paths() {
2523 let page = page_size();
2524 let mut registry = test_registry();
2525 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2526
2527 let buf = pool.try_alloc(0).expect("zero capacity should succeed");
2529 assert_eq!(buf.capacity(), page);
2530 assert_eq!(buf.len(), 0);
2531 let buf = pool.try_alloc(page).expect("exact max size should succeed");
2532 assert_eq!(buf.capacity(), page);
2533
2534 let mut buf = pool.try_alloc(100).unwrap();
2536 buf.put_slice(&[0x42; 100]);
2537 Buf::advance(&mut buf, 100);
2538 assert!(buf.freeze().is_empty());
2539
2540 let mut buf = pool.try_alloc(100).unwrap();
2542 buf.put_slice(&[0xAA; 50]);
2543 Buf::advance(&mut buf, 20);
2544 let frozen = buf.freeze();
2545 assert_eq!(frozen.len(), 30);
2546 assert_eq!(frozen.as_ref(), &[0xAA; 30]);
2547
2548 let mut buf = pool.try_alloc(100).unwrap();
2550 buf.put_slice(&[0xAA; 50]);
2551 buf.clear();
2552 let frozen = buf.freeze();
2553 assert!(frozen.is_empty());
2554 }
2555
2556 #[test]
2557 fn test_interleaved_advance_and_write() {
2558 let page = page_size();
2559 let mut registry = test_registry();
2560 let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2561
2562 let mut buf = pool.try_alloc(100).unwrap();
2563 buf.put_slice(b"hello");
2564 Buf::advance(&mut buf, 2);
2565 buf.put_slice(b"world");
2566 assert_eq!(buf.as_ref(), b"lloworld");
2567 }
2568
2569 #[test]
2570 fn test_alignment_after_advance() {
2571 let page = page_size();
2572 let mut registry = test_registry();
2573 let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
2574
2575 let mut buf = pool.try_alloc(100).unwrap();
2576 buf.put_slice(&[0; 100]);
2577
2578 assert_eq!(buf.as_mut_ptr() as usize % page, 0);
2580
2581 Buf::advance(&mut buf, 7);
2583 assert_ne!(buf.as_mut_ptr() as usize % page, 0);
2585 }
2586}