1use crate::error::{FrozenErr, FrozenRes};
37use std::{
38 ptr,
39 sync::{self, atomic},
40};
41
42const INVALID_POOL_SLOT: usize = u32::MAX as usize;
43
44#[derive(Debug, Clone)]
46pub struct BPCfg {
47 pub mid: u8,
49
50 pub chunk_size: usize,
52
53 pub backend: BPBackend,
55}
56
57#[derive(Debug, PartialEq, Clone, Copy)]
59pub enum BPBackend {
60 Dynamic,
69
70 Prealloc {
90 capacity: usize,
92 },
93}
94
95#[derive(Debug)]
108pub struct BufPool {
109 cfg: BPCfg,
110 state: BackendState,
111 lock: sync::Mutex<()>,
112 wait_cv: sync::Condvar,
113 close_cv: sync::Condvar,
114 active: atomic::AtomicUsize,
115}
116
117unsafe impl Send for BufPool {}
118unsafe impl Sync for BufPool {}
119
120impl BufPool {
121 pub fn new(cfg: BPCfg) -> Self {
138 let state = BackendState::new(&cfg);
139 Self {
140 cfg,
141 state,
142 lock: sync::Mutex::new(()),
143 wait_cv: sync::Condvar::new(),
144 close_cv: sync::Condvar::new(),
145 active: atomic::AtomicUsize::new(0),
146 }
147 }
148
149 #[inline(always)]
182 pub fn allocate(&self, n: usize) -> FrozenRes<Allocation> {
183 match &self.state {
184 BackendState::Dynamic => Ok(Allocation::new_dynamic(self, n)),
185 BackendState::Prealloc(state) => {
186 if n > state.capacity {
187 return Ok(Allocation::new_dynamic(self, n));
188 }
189
190 state.allocate(n, self)
191 }
192 }
193 }
194}
195
196impl Drop for BufPool {
197 fn drop(&mut self) {
198 let mut guard = match self.lock.lock() {
199 Ok(g) => g,
200 Err(_) => return,
201 };
202
203 while self.active.load(atomic::Ordering::Acquire) != 0 {
204 guard = self.close_cv.wait(guard).expect("shutdown cv poisoned");
205 }
206
207 if let BackendState::Prealloc(state) = &self.state {
208 let pool_size = state.capacity * self.cfg.chunk_size;
209
210 let _ = unsafe { Vec::from_raw_parts(state.base_ptr, pool_size, pool_size) };
213 }
214 }
215}
216
217#[derive(Debug)]
218enum BackendState {
219 Dynamic,
220 Prealloc(PreallocState),
221}
222
223impl BackendState {
224 fn new(cfg: &BPCfg) -> Self {
225 match cfg.backend {
226 BPBackend::Dynamic => BackendState::Dynamic,
227 BPBackend::Prealloc { capacity } => BackendState::Prealloc(PreallocState::new(capacity, cfg)),
228 }
229 }
230}
231
232#[derive(Debug)]
233struct PreallocState {
234 capacity: usize,
235 base_ptr: TBPPtr,
236 head: atomic::AtomicUsize,
237 next: Box<[atomic::AtomicUsize]>,
238}
239
240impl PreallocState {
241 fn new(capacity: usize, cfg: &BPCfg) -> Self {
242 let pool_size = capacity * cfg.chunk_size;
243
244 let mut pool = Vec::<u8>::with_capacity(pool_size);
245 let base_ptr = pool.as_mut_ptr();
246
247 unsafe { pool.set_len(pool_size) };
251
252 std::mem::forget(pool);
257
258 let mut next = Vec::with_capacity(capacity);
259 for i in 0..capacity {
260 let _i = 1 + i;
261 let n = if _i < capacity { _i } else { INVALID_POOL_SLOT };
262 next.push(atomic::AtomicUsize::new(n));
263 }
264
265 Self {
266 capacity,
267 base_ptr,
268 next: next.into_boxed_slice(),
269 head: atomic::AtomicUsize::new(pack_pool_idx(0, 0)),
270 }
271 }
272
273 #[inline(always)]
274 fn allocate(&self, n: usize, pool: &BufPool) -> FrozenRes<Allocation> {
275 let mut remaining = n;
276 let mut alloc = Allocation::new(pool, n);
277
278 while remaining != 0 {
279 let taken = self.alloc_batch(remaining, pool, &mut alloc);
280
281 if taken == 0 {
282 self.wait(pool)?;
283 continue;
284 }
285
286 remaining -= taken;
287 }
288
289 Ok(alloc)
290 }
291
292 #[inline(always)]
293 fn alloc_batch(&self, cap: usize, pool: &BufPool, out: &mut Allocation) -> usize {
294 let mut head = self.head.load(atomic::Ordering::Relaxed);
295 loop {
296 let (idx, tag) = unpack_pool_idx(head);
297
298 if idx == INVALID_POOL_SLOT {
307 return 0;
308 }
309
310 let mut count = 1;
315 let mut last = idx;
316
317 while count < cap {
318 let next = self.next[last].load(atomic::Ordering::Relaxed);
319 if next == INVALID_POOL_SLOT {
320 break;
321 }
322
323 last = next;
324 count += 1;
325 }
326
327 let new_head_idx = self.next[last].load(atomic::Ordering::Relaxed);
328 let new_head = pack_pool_idx(new_head_idx, tag + 1);
329
330 match self
331 .head
332 .compare_exchange(head, new_head, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
333 {
334 Err(h) => head = h,
335 Ok(_) => {
336 let base = self.base_ptr;
337 let chunk = pool.cfg.chunk_size;
338
339 let slots = out.slots.slots();
340 slots.reserve(count);
341
342 let mut cur = idx;
343 for _ in 0..count {
344 slots.push(unsafe { base.add(cur * chunk) });
345 cur = self.next[cur].load(atomic::Ordering::Relaxed);
346 }
347
348 out.count += count;
349 return count;
350 }
351 }
352 }
353 }
354
355 #[inline]
360 fn wait(&self, pool: &BufPool) -> FrozenRes<()> {
361 if self.has_free() {
362 return Ok(());
363 }
364
365 let mut guard = pool
366 .lock
367 .lock()
368 .map_err(|e| new_err_raw(BufPoolErrRes::Lpn, e, pool.cfg.mid))?;
369
370 if self.has_free() {
371 return Ok(());
372 }
373
374 while !self.has_free() {
375 guard = pool
376 .wait_cv
377 .wait(guard)
378 .map_err(|e| new_err_raw(BufPoolErrRes::Lpn, e, pool.cfg.mid))?;
379 }
380
381 Ok(())
382 }
383
384 #[inline]
385 fn has_free(&self) -> bool {
386 let (idx, _) = unpack_pool_idx(self.head.load(atomic::Ordering::Acquire));
387 idx != INVALID_POOL_SLOT
388 }
389
390 #[inline(always)]
391 fn free(&self, ptr: TBPPtr, pool: &BufPool) {
392 let offset = unsafe { ptr.offset_from(self.base_ptr) } as usize;
393 let idx = offset / pool.cfg.chunk_size;
394
395 let mut head = self.head.load(atomic::Ordering::Acquire);
396 loop {
397 let (head_idx, head_tag) = unpack_pool_idx(head);
398 self.next[idx].store(head_idx, atomic::Ordering::Relaxed);
399 let new = pack_pool_idx(idx, 1 + head_tag);
400
401 match self
402 .head
403 .compare_exchange(head, new, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
404 {
405 Ok(_) => {
406 pool.wait_cv.notify_one();
407 return;
408 }
409 Err(h) => head = h,
410 }
411 }
412 }
413}
414
415const ERRDOMAIN: u8 = 0x13;
417
418#[repr(u16)]
420pub enum BufPoolErrRes {
421 Lpn = 0x300,
423}
424
425impl BufPoolErrRes {
426 #[inline]
427 fn default_message(&self) -> &'static [u8] {
428 match self {
429 Self::Lpn => b"lock poisoned while waiting for BufPool",
430 }
431 }
432}
433
434#[inline]
435fn new_err_raw<E: std::fmt::Display>(res: BufPoolErrRes, error: E, mid: u8) -> FrozenErr {
436 let detail = res.default_message();
437 FrozenErr::new(
438 mid,
439 ERRDOMAIN,
440 res as u16,
441 detail,
442 error.to_string().as_bytes().to_vec(),
443 )
444}
445
446const POOL_IDX_BITS: usize = 0x20;
447const POOL_IDX_MASK: usize = (1 << POOL_IDX_BITS) - 1;
448
449#[inline]
450fn pack_pool_idx(idx: usize, tag: usize) -> usize {
451 (tag << POOL_IDX_BITS) | (idx & POOL_IDX_MASK)
452}
453
454#[inline]
455fn unpack_pool_idx(id: usize) -> (usize, usize) {
456 (id & POOL_IDX_MASK, id >> POOL_IDX_BITS)
457}
458
459pub type TBPPtr = *mut u8;
461
462#[derive(Debug)]
464pub struct Allocation {
465 pub count: usize,
467
468 slots: AllocSlotType,
470
471 guard: AllocationGuard,
473}
474
475unsafe impl Send for Allocation {}
476
477impl Allocation {
478 #[inline]
500 pub fn slots(&self) -> &Vec<TBPPtr> {
501 match &self.slots {
502 AllocSlotType::Pool(slots) => slots,
503 AllocSlotType::Dynamic(slots) => slots,
504 }
505 }
506
507 #[inline]
508 fn new(pool: &BufPool, cap: usize) -> Self {
509 pool.active.fetch_add(1, atomic::Ordering::Relaxed);
510
511 Self {
512 count: 0,
513 guard: AllocationGuard(ptr::NonNull::from(pool)),
514 slots: AllocSlotType::Pool(Vec::<TBPPtr>::with_capacity(cap)),
515 }
516 }
517
518 #[inline]
519 fn new_dynamic(pool: &BufPool, count: usize) -> Self {
520 let total = pool.cfg.chunk_size * count;
521 pool.active.fetch_add(1, atomic::Ordering::Relaxed);
522
523 let mut slice = Vec::<u8>::with_capacity(total);
524 let base_ptr = slice.as_mut_ptr();
525
526 unsafe { slice.set_len(total) };
530
531 std::mem::forget(slice);
536
537 let mut ptrs = Vec::with_capacity(count);
538 for i in 0..count {
539 let p = unsafe { base_ptr.add(i * pool.cfg.chunk_size) };
540 ptrs.push(p);
541 }
542
543 Self {
544 count,
545 slots: AllocSlotType::Dynamic(ptrs),
546 guard: AllocationGuard(ptr::NonNull::from(pool)),
547 }
548 }
549}
550
551impl Drop for Allocation {
552 fn drop(&mut self) {
553 let pool = unsafe { self.guard.0.as_ref() };
554
555 match &self.slots {
556 AllocSlotType::Pool(slots) => match &pool.state {
557 BackendState::Prealloc(state) => {
558 for ptr in slots {
559 state.free(*ptr, pool);
560 }
561 }
562
563 _ => unreachable!(),
567 },
568
569 AllocSlotType::Dynamic(slots) => {
570 if slots.is_empty() {
572 return;
573 }
574
575 let buf_size = pool.cfg.chunk_size;
576 let total = buf_size * slots.len();
577 let base = slots[0];
578
579 let _ = unsafe { Vec::from_raw_parts(base, total, total) };
580 }
581 }
582 }
583}
584
585#[derive(Debug)]
586enum AllocSlotType {
587 Pool(Vec<TBPPtr>),
588 Dynamic(Vec<TBPPtr>),
589}
590
591impl AllocSlotType {
592 fn slots(&mut self) -> &mut Vec<TBPPtr> {
593 match self {
594 Self::Pool(slots) => slots,
595 Self::Dynamic(slots) => slots,
596 }
597 }
598}
599
600#[derive(Debug)]
601struct AllocationGuard(ptr::NonNull<BufPool>);
602
603impl Drop for AllocationGuard {
604 fn drop(&mut self) {
605 let pool = unsafe { self.0.as_ref() };
606
607 if pool.active.fetch_sub(1, atomic::Ordering::Release) == 1 {
608 if let Ok(_g) = pool.lock.lock() {
610 pool.close_cv.notify_one();
611 }
612 }
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619 use crate::error::TEST_MID;
620
621 const CAP: usize = 0x20;
622 const SIZE: usize = 0x0A;
623
624 fn new_pool_prealloc(capacity: usize) -> BufPool {
625 BufPool::new(BPCfg {
626 mid: TEST_MID,
627 chunk_size: SIZE,
628 backend: BPBackend::Prealloc { capacity },
629 })
630 }
631
632 fn new_pool_dynamic() -> BufPool {
633 BufPool::new(BPCfg {
634 mid: TEST_MID,
635 chunk_size: SIZE,
636 backend: BPBackend::Dynamic,
637 })
638 }
639
640 mod utils {
641 use super::*;
642
643 #[test]
644 fn pack_unpack_cycle() {
645 let pack_id = pack_pool_idx(0x20, 0x0A);
646 let (idx, tag) = unpack_pool_idx(pack_id);
647
648 assert_eq!(idx, 0x20);
649 assert_eq!(tag, 0x0A);
650 }
651 }
652
653 mod pre_allocs {
654 use super::*;
655
656 #[test]
657 fn ok_alloc_works() {
658 let pool = new_pool_prealloc(CAP);
659 let alloc = pool.allocate(1).unwrap();
660
661 assert_eq!(alloc.count, 1);
662 assert_eq!(alloc.slots().len(), 1);
663 }
664
665 #[test]
666 fn ok_alloc_exact_cap_as_requested() {
667 let pool = new_pool_prealloc(CAP);
668 let alloc = pool.allocate(CAP).unwrap();
669
670 assert_eq!(alloc.count, CAP);
671 assert_eq!(alloc.slots().len(), CAP);
672 }
673
674 #[test]
675 fn ok_alloc_all_even_when_exhausted() {
676 let pool = new_pool_prealloc(CAP);
677
678 let a1 = pool.allocate(CAP - 1).unwrap();
679 assert_eq!(a1.count, CAP - 1);
680 drop(a1);
681
682 let a2 = pool.allocate(CAP).unwrap();
683 assert_eq!(a2.count, CAP);
684 drop(a2);
685
686 let a3 = pool.allocate(1).unwrap();
687 assert_eq!(a3.count, 1);
688 }
689
690 #[test]
691 fn ok_alloc_all_when_requested_larger_then_cap() {
692 let pool = new_pool_prealloc(CAP);
693
694 let a1 = pool.allocate(CAP * 2).unwrap();
695 assert_eq!(a1.count, CAP * 2);
696 }
697
698 #[test]
699 fn ok_no_duplicate_slots_in_single_alloc() {
700 let pool = new_pool_prealloc(CAP);
701
702 let alloc = pool.allocate(CAP).unwrap();
703 let mut ptrs: Vec<TBPPtr> = alloc.slots().to_vec();
704
705 ptrs.sort();
707 ptrs.dedup();
708
709 assert_eq!(ptrs.len(), CAP);
710 }
711
712 #[test]
713 fn ok_large_allocation_with_pre_alloc() {
714 let pool = new_pool_prealloc(0x100);
715
716 for i in 0..0x100 {
717 let a = pool.allocate(i).unwrap();
718 assert_eq!(a.slots().len(), i);
719 }
720
721 let final_alloc = pool.allocate(0x10).unwrap();
722 assert_eq!(final_alloc.count, 0x10);
723 }
724 }
725
726 mod dynamic_allocs {
727 use super::*;
728
729 #[test]
730 fn ok_dynamic_alloc() {
731 let pool = new_pool_dynamic();
732 let alloc = pool.allocate(CAP).unwrap();
733
734 assert_eq!(alloc.count, CAP);
735 assert_eq!(alloc.slots().len(), CAP);
736 }
737
738 #[test]
739 fn ok_no_duplicate_slots_in_single_dynamic_alloc() {
740 let pool = new_pool_dynamic();
741
742 let alloc = pool.allocate(CAP).unwrap();
743 let mut ptrs: Vec<TBPPtr> = alloc.slots().to_vec();
744
745 ptrs.sort();
747 ptrs.dedup();
748
749 assert_eq!(ptrs.len(), CAP);
750 }
751
752 #[test]
753 fn ok_large_allocation_with_dynamic_alloc() {
754 let pool = new_pool_dynamic();
755 let alloc = pool.allocate(0x400).unwrap();
756
757 assert_eq!(alloc.count, 0x400);
758 assert_eq!(alloc.slots().len(), 0x400);
759 }
760 }
761
762 mod raii_safety {
763 use super::*;
764
765 #[test]
766 fn ok_pre_alloc_auto_free_on_drop() {
767 let pool = new_pool_prealloc(CAP);
768
769 {
770 let alloc = pool.allocate(CAP).unwrap();
771 assert_eq!(alloc.count, CAP);
772 }
773
774 assert_eq!(pool.active.load(atomic::Ordering::Acquire), 0);
776
777 let alloc2 = pool.allocate(CAP).unwrap();
778 assert_eq!(alloc2.count, CAP);
779 }
780
781 #[test]
782 fn ok_dynamic_alloc_auto_free_on_drop() {
783 let pool = new_pool_dynamic();
784
785 {
786 let alloc = pool.allocate(CAP).unwrap();
787 assert_eq!(alloc.count, CAP);
788 }
789
790 assert_eq!(pool.active.load(atomic::Ordering::Acquire), 0);
792 }
793 }
794
795 mod concurrency {
796 use super::*;
797 use std::sync::{Arc, Barrier};
798 use std::thread;
799
800 #[test]
801 fn ok_concurrent_alloc() {
802 const THREADS: usize = 8;
803 const ITERS: usize = 0x1000;
804
805 let barrier = Arc::new(Barrier::new(THREADS));
806 let pool = Arc::new(new_pool_prealloc(CAP * 0x0A));
807
808 let mut handles = Vec::new();
809 for _ in 0..THREADS {
810 let pool = pool.clone();
811 let barrier = barrier.clone();
812
813 handles.push(thread::spawn(move || {
814 barrier.wait();
815
816 for _ in 0..ITERS {
817 let mut n = CAP / 2;
818 while n != 0 {
819 let alloc = pool.allocate(n).unwrap();
820 n -= alloc.count;
821 }
822 }
823 }));
824 }
825
826 for h in handles {
827 assert!(h.join().is_ok());
828 }
829
830 let final_alloc = pool.allocate(CAP).unwrap();
831 assert_eq!(final_alloc.count, CAP);
832 }
833
834 #[test]
835 fn ok_concurrent_dynamic_alloc() {
836 const THREADS: usize = 8;
837 const ITERS: usize = 0x200;
838
839 let pool = Arc::new(new_pool_dynamic());
840
841 let mut handles = Vec::new();
842 for _ in 0..THREADS {
843 let pool = pool.clone();
844
845 handles.push(thread::spawn(move || {
846 for _ in 0..ITERS {
847 let alloc = pool.allocate(0x10).unwrap();
848 assert_eq!(alloc.count, 0x10);
849 }
850 }));
851 }
852
853 for h in handles {
854 assert!(h.join().is_ok());
855 }
856
857 assert_eq!(pool.active.load(atomic::Ordering::Acquire), 0);
858
859 let alloc = pool.allocate(CAP).unwrap();
861 assert_eq!(alloc.count, CAP);
862 }
863 }
864
865 mod polling {
866 use super::*;
867 use std::sync::Arc;
868 use std::thread;
869 use std::time::{Duration, Instant};
870
871 #[test]
872 fn ok_pre_alloc_blocks_until_buffers_freed() {
873 let pool = Arc::new(new_pool_prealloc(1));
874 let a = pool.allocate(1).unwrap();
875
876 let pool2 = pool.clone();
877 let h1 = thread::spawn(move || {
878 let start = Instant::now();
879 let alloc = pool2.allocate(1).expect("alloc failed");
880 let elapsed = start.elapsed();
881
882 assert!(elapsed >= Duration::from_millis(20));
883 assert_eq!(alloc.count, 1);
884 });
885
886 let h2 = thread::spawn(move || {
887 thread::sleep(Duration::from_millis(30));
888 drop(a);
889 });
890
891 assert!(h1.join().is_ok());
892 assert!(h2.join().is_ok());
893 }
894 }
895
896 mod shutdown_safety {
897 use super::*;
898 use std::sync::Arc;
899
900 #[test]
901 fn drop_waits_for_active_pre_allocations() {
902 let pool = Arc::new(new_pool_prealloc(CAP * 0x0A));
903 let pool2 = pool.clone();
904
905 let handle = std::thread::spawn(move || {
906 let alloc = pool2.allocate(4).unwrap();
907 std::thread::sleep(std::time::Duration::from_millis(0x32));
908 drop(alloc);
909 });
910
911 std::thread::sleep(std::time::Duration::from_millis(0x0A)); drop(pool); assert!(handle.join().is_ok());
915 }
916
917 #[test]
918 fn drop_waits_for_active_dynamic_allocations() {
919 let pool = Arc::new(new_pool_prealloc(CAP * 0x0A));
920 let pool2 = pool.clone();
921
922 let handle = std::thread::spawn(move || {
923 let alloc = pool2.allocate(4).unwrap();
924 std::thread::sleep(std::time::Duration::from_millis(0x32));
925 drop(alloc);
926 });
927
928 std::thread::sleep(std::time::Duration::from_millis(0x0A)); drop(pool); assert!(handle.join().is_ok());
932 }
933
934 #[test]
935 fn ok_cross_thread_drop() {
936 let pool = Arc::new(new_pool_prealloc(0x0C));
937 let alloc = pool.allocate(0x0C).unwrap();
938
939 let h1 = {
940 std::thread::spawn(move || {
941 drop(alloc);
942 })
943 };
944
945 let h2 = {
946 let pool = pool.clone();
947
948 std::thread::spawn(move || {
949 let a = pool.allocate(8).unwrap();
950 assert_eq!(a.count, 8);
951 })
952 };
953
954 assert!(h1.join().is_ok());
955 assert!(h2.join().is_ok());
956 }
957 }
958}