1use std::{
2 mem,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc, Weak,
6 },
7};
8
9use crossbeam_queue::SegQueue;
10
11const DEFAULT_BUFFER_CAPACITY: usize = 1024;
12
13#[derive(Debug)]
14pub struct Buffer {
15 data: Box<[u8]>,
16 available: usize,
17 pool: Weak<BufferPool>,
18}
19
20impl Buffer {
21 fn new(data: Box<[u8]>, pool: &Arc<BufferPool>) -> Self {
22 let available = data.len();
23
24 Self {
25 data,
26 available,
27 pool: Arc::downgrade(pool),
28 }
29 }
30
31 fn new_with_available(data: Box<[u8]>, pool: &Arc<BufferPool>, available: usize) -> Self {
32 let available = available.min(data.len());
33
34 Self {
35 data,
36 available,
37 pool: Arc::downgrade(pool),
38 }
39 }
40
41 pub fn capacity(&self) -> usize {
42 self.data.len()
43 }
44
45 pub fn len(&self) -> usize {
46 self.available
47 }
48
49 pub fn is_empty(&self) -> bool {
50 self.available == 0
51 }
52
53 pub fn data(&self) -> &[u8] {
54 &self.data[..self.available]
55 }
56
57 pub fn data_mut(&mut self) -> &mut [u8] {
58 &mut self.data[..self.available]
59 }
60
61 pub fn resize(&mut self, len: usize) {
63 self.available = len.min(self.capacity());
64 }
65}
66
67impl Drop for Buffer {
68 fn drop(&mut self) {
69 if let Some(pool) = self.pool.upgrade() {
70 pool.recycle_buffer(mem::take(&mut self.data));
71 }
72 }
73}
74
75#[derive(Debug)]
76pub struct BufferPool {
77 queue: SegQueue<Arc<Buffer>>,
78 buffer_capacity: AtomicUsize,
79}
80
81impl BufferPool {
82 pub fn new(buffer_capacity: usize) -> Arc<Self> {
83 let buffer_capacity = if buffer_capacity == 0 {
84 DEFAULT_BUFFER_CAPACITY
85 } else {
86 buffer_capacity
87 };
88
89 Arc::new(Self {
90 queue: SegQueue::new(),
91 buffer_capacity: AtomicUsize::new(buffer_capacity),
92 })
93 }
94
95 pub fn available(&self) -> usize {
96 self.queue.len()
97 }
98
99 pub fn get_buffer(self: &Arc<Self>) -> Arc<Buffer> {
100 let buffer_capacity = self.buffer_capacity.load(Ordering::Acquire);
101 if let Some(mut buffer) = self.queue.pop() {
102 if buffer_capacity == buffer.capacity() {
103 if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
104 buffer_mut.resize(buffer_capacity);
105 return buffer;
106 }
107 }
108 }
109
110 Arc::new(Buffer::new(vec![0u8; buffer_capacity].into_boxed_slice(), self))
111 }
112
113 pub fn get_buffer_with_length(self: &Arc<Self>, len: usize) -> Arc<Buffer> {
114 let mut buffer_capacity = self.buffer_capacity.load(Ordering::Acquire);
115
116 if len > buffer_capacity {
117 self.set_buffer_capacity(len);
118 buffer_capacity = len;
119 }
120
121 if let Some(mut buffer) = self.queue.pop() {
122 if buffer_capacity == buffer.capacity() {
123 if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
124 buffer_mut.resize(len);
125 return buffer;
126 }
127 }
128 }
129
130 Arc::new(Buffer::new_with_available(vec![0u8; buffer_capacity].into_boxed_slice(), self, len))
131 }
132
133 pub fn recycle_buffer(self: &Arc<Self>, buffer_data: Box<[u8]>) {
134 if buffer_data.len() == self.buffer_capacity.load(Ordering::Acquire) {
135 self.queue.push(Arc::new(Buffer::new(buffer_data, self)));
136 }
137 }
138
139 pub fn get_buffer_capacity(&self) -> usize {
140 self.buffer_capacity.load(Ordering::Relaxed)
141 }
142
143 pub fn set_buffer_capacity(&self, buffer_capacity: usize) {
144 self.buffer_capacity.store(buffer_capacity, Ordering::Release);
145 while self.queue.pop().is_some() {}
146 }
147}