Skip to main content

typhoon/bytes/
pool.rs

1#[cfg(test)]
2#[path = "../../tests/bytes/pool.rs"]
3mod tests;
4
5use std::sync::Arc;
6
7use crossbeam::queue::ArrayQueue;
8
9use crate::bytes::dynamic::DynamicByteBuffer;
10use crate::bytes::utils::{allocate_ptr, copy_slice, free_ptr};
11
12/// Shared storage for pooled buffers.
13pub(crate) struct PoolStorage {
14    buffers: ArrayQueue<*mut u8>,
15    capacity: usize,
16}
17
18// SAFETY: ArrayQueue allows atomic operations.
19unsafe impl Send for PoolStorage {}
20unsafe impl Sync for PoolStorage {}
21
22impl PoolStorage {
23    /// Return buffer to pool, or free if at capacity.
24    #[inline]
25    pub(crate) fn try_return(&self, ptr: *mut u8) {
26        if self.buffers.push(ptr).is_err() {
27            free_ptr(ptr, self.capacity);
28        }
29    }
30
31    #[inline]
32    pub(crate) fn try_take(&self, size: usize) -> *mut u8 {
33        self.buffers.pop().unwrap_or_else(|| allocate_ptr(size))
34    }
35}
36
37pub(crate) type PoolReturn = Arc<PoolStorage>;
38
39/// Thread-safe pool of reusable byte buffers.
40pub struct BytePool {
41    before_cap: usize,
42    size: usize,
43    after_cap: usize,
44    storage: Arc<PoolStorage>,
45}
46
47impl BytePool {
48    /// Pool's default leading head-room.
49    #[inline]
50    pub fn before_cap(&self) -> usize {
51        self.before_cap
52    }
53
54    /// Pool's default trailing head-room.
55    #[inline]
56    pub fn after_cap(&self) -> usize {
57        self.after_cap
58    }
59
60    /// Create a new pool.
61    /// - `before_cap`: header space before main data
62    /// - `size`: main data capacity
63    /// - `after_cap`: trailer space after main data
64    /// - `initial`: pre-allocated buffer count
65    /// - `max_pooled`: maximum buffers to keep in pool
66    pub fn new(before_cap: usize, size: usize, after_cap: usize, initial: usize, max_pooled: usize) -> Self {
67        let capacity = before_cap + size + after_cap;
68        let actual_max = max_pooled.max(initial);
69
70        let buffers = ArrayQueue::new(actual_max);
71        for _ in 0..initial {
72            buffers.push(allocate_ptr(capacity)).expect("Should never happen actually.");
73        }
74
75        BytePool {
76            before_cap,
77            size,
78            after_cap,
79            storage: Arc::new(PoolStorage {
80                buffers,
81                capacity,
82            }),
83        }
84    }
85
86    /// Get a buffer from pool or allocate new one.
87    /// - `size`: optional size limit (must be <= pool's size), None for full size
88    ///
89    /// Returns a DynamicByteBuffer that auto-returns to pool on drop.
90    #[inline]
91    pub fn allocate(&self, size: Option<usize>) -> DynamicByteBuffer {
92        match size {
93            Some(res) => self.allocate_precise(res, self.before_cap, self.after_cap),
94            None => self.allocate_precise(self.size, self.before_cap, self.after_cap),
95        }
96    }
97
98    /// Allocate a buffer sized for receiving raw packets from the network.
99    /// Uses the maximum available active view (size + after_cap) to accommodate
100    /// on-wire packets that are larger than the user-data MTU due to protocol overhead.
101    /// The before_cap headroom is preserved for subsequent send-path expand_start calls.
102    #[inline]
103    pub fn allocate_for_recv(&self) -> DynamicByteBuffer {
104        self.allocate_precise(self.size + self.after_cap, self.before_cap, 0)
105    }
106
107    #[inline]
108    pub fn allocate_precise(&self, size: usize, before_cap: usize, after_cap: usize) -> DynamicByteBuffer {
109        let requested_size = before_cap + size + after_cap;
110        assert!(requested_size <= self.storage.capacity, "Requested size greater than pool capacity ({requested_size} > {})!", self.storage.capacity);
111        let actual_after_cap = self.storage.capacity + after_cap - requested_size;
112
113        let data = self.storage.try_take(self.storage.capacity);
114        DynamicByteBuffer::new(data, self.storage.capacity, before_cap, size, actual_after_cap, Arc::clone(&self.storage))
115    }
116
117    #[inline]
118    pub fn allocate_precise_from_slice_with_capacity(&self, data: &[u8], before_cap: usize, after_cap: usize) -> DynamicByteBuffer {
119        let buff = self.allocate_precise(data.len(), before_cap, after_cap);
120        if !data.is_empty() {
121            copy_slice(unsafe { buff.data_ptr().add(before_cap) }, data);
122        }
123        buff
124    }
125
126    #[inline]
127    pub fn allocate_precise_from_array_with_capacity<const N: usize>(&self, arr: &[u8; N], before_cap: usize, after_cap: usize) -> DynamicByteBuffer {
128        self.allocate_precise_from_slice_with_capacity(arr.as_slice(), before_cap, after_cap)
129    }
130}