1use std::{
2 mem,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc, Weak,
6 },
7};
8
9use crossbeam_queue::SegQueue;
10
11pub struct Buffer {
12 data: Box<[u8]>,
13 available: usize,
14 pool: Weak<BufferPool>,
15}
16
17impl Buffer {
18 fn new(data: Box<[u8]>, pool: &Arc<BufferPool>) -> Self {
19 let available = data.len();
20
21 Self {
22 data,
23 available,
24 pool: Arc::downgrade(pool),
25 }
26 }
27
28 fn new_with_available(data: Box<[u8]>, pool: &Arc<BufferPool>, available: usize) -> Self {
29 let available = available.min(data.len());
30
31 Self {
32 data,
33 available,
34 pool: Arc::downgrade(pool),
35 }
36 }
37
38 pub fn capacity(&self) -> usize {
39 self.data.len()
40 }
41
42 pub fn len(&self) -> usize {
43 self.available
44 }
45
46 pub fn is_empty(&self) -> bool {
47 self.available == 0
48 }
49
50 pub fn data(&self) -> &[u8] {
51 &self.data[..self.available]
52 }
53
54 pub fn data_mut(&mut self) -> &mut [u8] {
55 &mut self.data[..self.available]
56 }
57
58 pub fn resize(&mut self, len: usize) {
60 self.available = len.min(self.capacity());
61 }
62}
63
64impl Drop for Buffer {
65 fn drop(&mut self) {
66 if let Some(pool) = self.pool.upgrade() {
67 pool.recycle_buffer(Arc::new(Buffer::new(mem::take(&mut self.data), &pool)));
68 }
69 }
70}
71
72pub struct BufferPool {
73 queue: SegQueue<Arc<Buffer>>,
74 buffer_capacity: AtomicUsize,
75}
76
77impl BufferPool {
78 pub fn new(buffer_capacity: usize) -> Arc<Self> {
79 Arc::new(Self {
80 queue: SegQueue::new(),
81 buffer_capacity: AtomicUsize::new(buffer_capacity),
82 })
83 }
84
85 pub fn available(&self) -> usize {
86 self.queue.len()
87 }
88
89 pub fn get_buffer(self: &Arc<Self>) -> Arc<Buffer> {
90 let buffer_capacity = self.buffer_capacity.load(Ordering::Relaxed);
91 if let Some(mut buffer) = self.queue.pop() {
92 if buffer_capacity == buffer.capacity() {
93 if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
94 buffer_mut.resize(buffer_capacity);
95 buffer_mut.data_mut().fill(0);
96 return buffer;
97 }
98 }
99 }
100
101 Arc::new(Buffer::new(vec![0u8; buffer_capacity].into_boxed_slice(), self))
102 }
103
104 pub fn get_buffer_with_length(self: &Arc<Self>, len: usize) -> Arc<Buffer> {
105 let buffer_capacity = self.buffer_capacity.load(Ordering::Relaxed);
106 let len = len.min(buffer_capacity);
107
108 if let Some(mut buffer) = self.queue.pop() {
109 if buffer_capacity == buffer.capacity() {
110 if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
111 buffer_mut.resize(len);
112 buffer_mut.data_mut().fill(0);
113 return buffer;
114 }
115 }
116 }
117
118 Arc::new(Buffer::new_with_available(vec![0u8; buffer_capacity].into_boxed_slice(), self, len))
119 }
120
121 pub fn recycle_buffer(&self, buffer: Arc<Buffer>) {
122 if buffer.capacity() == self.buffer_capacity.load(Ordering::Relaxed) {
123 self.queue.push(buffer);
124 }
125 }
126
127 pub fn get_buffer_capacity(&self) -> usize {
128 self.buffer_capacity.load(Ordering::Relaxed)
129 }
130
131 pub fn set_buffer_capacity(&self, buffer_capacity: usize) {
132 self.buffer_capacity.store(buffer_capacity, Ordering::Relaxed);
133 while self.queue.pop().is_some() {}
134 }
135}