1use std::{
10 cmp,
11 io::{self, Read, Write},
12 ops, ptr,
13 sync::atomic::{AtomicUsize, Ordering},
14};
15
16static BUFFER_COUNT: AtomicUsize = AtomicUsize::new(0);
17
18pub struct Pool {
19 pub inner: poule::Pool<BufferMetadata>,
20 pub buffer_size: usize,
21}
22
23impl Pool {
24 pub fn with_capacity(minimum: usize, maximum: usize, buffer_size: usize) -> Pool {
25 let mut inner = poule::Pool::with_extra(maximum, buffer_size);
26 inner.grow_to(minimum);
27 Pool { inner, buffer_size }
28 }
29
30 pub fn checkout(&mut self) -> Option<Checkout> {
31 if self.inner.used() == self.inner.capacity()
32 && self.inner.capacity() < self.inner.maximum_capacity()
33 {
34 self.inner.grow_to(std::cmp::min(
35 self.inner.capacity() * 2,
36 self.inner.maximum_capacity(),
37 ));
38 debug!(
39 "growing pool capacity from {} to {}",
40 self.inner.capacity(),
41 std::cmp::min(self.inner.capacity() * 2, self.inner.maximum_capacity())
42 );
43 }
44 let capacity = self.buffer_size;
45 self.inner
46 .checkout(|| {
47 trace!("initializing a buffer with capacity {}", capacity);
48 BufferMetadata::new()
49 })
50 .map(|c| {
51 let old_buffer_count = BUFFER_COUNT.fetch_add(1, Ordering::SeqCst);
52 gauge!("buffer.number", old_buffer_count + 1);
53 Checkout { inner: c }
54 })
55 }
56}
57
58impl ops::Deref for Pool {
59 type Target = poule::Pool<BufferMetadata>;
60
61 fn deref(&self) -> &Self::Target {
62 &self.inner
63 }
64}
65
66impl ops::DerefMut for Pool {
67 fn deref_mut(&mut self) -> &mut poule::Pool<BufferMetadata> {
68 &mut self.inner
69 }
70}
71
72#[derive(Debug, PartialEq, Eq, Clone)]
73pub struct BufferMetadata {
74 position: usize,
75 end: usize,
76}
77
78impl Default for BufferMetadata {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl BufferMetadata {
85 pub fn new() -> BufferMetadata {
86 BufferMetadata {
87 position: 0,
88 end: 0,
89 }
90 }
91}
92
93pub struct Checkout {
94 pub inner: poule::Checkout<BufferMetadata>,
95}
96
97impl Drop for Checkout {
114 fn drop(&mut self) {
115 let old_buffer_count = BUFFER_COUNT.fetch_sub(1, Ordering::SeqCst);
116 gauge!("buffer.number", old_buffer_count - 1);
117 }
118}
119
120impl Checkout {
121 pub fn available_data(&self) -> usize {
122 self.inner.end - self.inner.position
123 }
124
125 pub fn available_space(&self) -> usize {
126 self.capacity() - self.inner.end
127 }
128
129 pub fn capacity(&self) -> usize {
130 self.inner.extra().len()
131 }
132
133 pub fn empty(&self) -> bool {
134 self.inner.position == self.inner.end
135 }
136
137 pub fn consume(&mut self, count: usize) -> usize {
138 let cnt = cmp::min(count, self.available_data());
139 self.inner.position += cnt;
140 if self.inner.position > self.capacity() / 2 {
141 self.shift();
143 }
144 cnt
145 }
146
147 pub fn fill(&mut self, count: usize) -> usize {
148 let cnt = cmp::min(count, self.available_space());
149 self.inner.end += cnt;
150 if self.available_space() < self.available_data() + cnt {
151 self.shift();
153 }
154
155 cnt
156 }
157
158 pub fn reset(&mut self) {
159 self.inner.position = 0;
160 self.inner.end = 0;
161 }
162
163 pub fn sync(&mut self, end: usize, position: usize) {
164 self.inner.position = position;
165 self.inner.end = end;
166 }
167
168 pub fn data(&self) -> &[u8] {
169 &self.inner.extra()[self.inner.position..self.inner.end]
170 }
171
172 pub fn space(&mut self) -> &mut [u8] {
173 let range = self.inner.end..self.capacity();
174 &mut self.inner.extra_mut()[range]
175 }
176
177 pub fn shift(&mut self) {
178 let pos = self.inner.position;
179 let end = self.inner.end;
180 if pos > 0 {
181 unsafe {
182 let length = end - pos;
183 ptr::copy(
184 self.inner.extra()[pos..end].as_ptr(),
185 self.inner.extra_mut()[..length].as_mut_ptr(),
186 length,
187 );
188 self.inner.position = 0;
189 self.inner.end = length;
190 }
191 }
192 }
193
194 pub fn delete_slice(&mut self, start: usize, length: usize) -> Option<usize> {
195 if start + length >= self.available_data() {
196 return None;
197 }
198
199 unsafe {
200 let begin = self.inner.position + start;
201 let next_end = self.inner.end - length;
202 ptr::copy(
203 self.inner.extra()[begin + length..self.inner.end].as_ptr(),
204 self.inner.extra_mut()[begin..next_end].as_mut_ptr(),
205 self.inner.end - (begin + length),
206 );
207 self.inner.end = next_end;
208 }
209 Some(self.available_data())
210 }
211
212 pub fn replace_slice(&mut self, data: &[u8], start: usize, length: usize) -> Option<usize> {
213 let data_len = data.len();
214 if start + length > self.available_data()
215 || self.inner.position + start + data_len > self.capacity()
216 {
217 return None;
218 }
219
220 unsafe {
221 let begin = self.inner.position + start;
222 let slice_end = begin + data_len;
223 if data_len < length {
225 ptr::copy(
226 data.as_ptr(),
227 self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
228 data_len,
229 );
230
231 ptr::copy(
232 self.inner.extra()[start + length..self.inner.end].as_ptr(),
233 self.inner.extra_mut()[slice_end..].as_mut_ptr(),
234 self.inner.end - (start + length),
235 );
236 self.inner.end -= length - data_len;
237
238 } else {
240 ptr::copy(
241 self.inner.extra()[start + length..self.inner.end].as_ptr(),
242 self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
243 self.inner.end - (start + length),
244 );
245 ptr::copy(
246 data.as_ptr(),
247 self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
248 data_len,
249 );
250 self.inner.end += data_len - length;
251 }
252 }
253 Some(self.available_data())
254 }
255
256 pub fn insert_slice(&mut self, data: &[u8], start: usize) -> Option<usize> {
257 let data_len = data.len();
258 if start > self.available_data()
259 || self.inner.position + self.inner.end + data_len > self.capacity()
260 {
261 return None;
262 }
263
264 unsafe {
265 let begin = self.inner.position + start;
266 let slice_end = begin + data_len;
267 ptr::copy(
268 self.inner.extra()[start..self.inner.end].as_ptr(),
269 self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
270 self.inner.end - start,
271 );
272 ptr::copy(
273 data.as_ptr(),
274 self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
275 data_len,
276 );
277 self.inner.end += data_len;
278 }
279 Some(self.available_data())
280 }
281}
282
283impl Write for Checkout {
284 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
285 match self.space().write(buf) {
286 Ok(size) => {
287 self.fill(size);
288 Ok(size)
289 }
290 err => err,
291 }
292 }
293
294 fn flush(&mut self) -> io::Result<()> {
295 Ok(())
296 }
297}
298
299impl Read for Checkout {
300 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
301 let len = cmp::min(self.available_data(), buf.len());
302 unsafe {
303 ptr::copy(
304 self.inner.extra()[self.inner.position..self.inner.position + len].as_ptr(),
305 buf.as_mut_ptr(),
306 len,
307 );
308 self.inner.position += len;
309 }
310 Ok(len)
311 }
312}