sozu_lib/
pool.rs

1/// experimental module to measure buffer pool usage
2///
3/// this allows us to track how many buffers are used through the
4/// buffers.count metric.
5///
6/// Right now, we wrap the `pool` crate, but we might write a different
7/// buffer pool in the future, so this module will still be useful to
8/// test the differences
9use std::{
10    cmp,
11    io::{self, Read, Write},
12    ops, ptr,
13    sync::atomic::{AtomicUsize, Ordering},
14};
15
16static BUFFER_COUNT: AtomicUsize = AtomicUsize::new(0);
17
18pub struct Pool {
19    pub inner: poule::Pool<BufferMetadata>,
20    pub buffer_size: usize,
21}
22
23impl Pool {
24    pub fn with_capacity(minimum: usize, maximum: usize, buffer_size: usize) -> Pool {
25        let mut inner = poule::Pool::with_extra(maximum, buffer_size);
26        inner.grow_to(minimum);
27        Pool { inner, buffer_size }
28    }
29
30    pub fn checkout(&mut self) -> Option<Checkout> {
31        if self.inner.used() == self.inner.capacity()
32            && self.inner.capacity() < self.inner.maximum_capacity()
33        {
34            self.inner.grow_to(std::cmp::min(
35                self.inner.capacity() * 2,
36                self.inner.maximum_capacity(),
37            ));
38            debug!(
39                "growing pool capacity from {} to {}",
40                self.inner.capacity(),
41                std::cmp::min(self.inner.capacity() * 2, self.inner.maximum_capacity())
42            );
43        }
44        let capacity = self.buffer_size;
45        self.inner
46            .checkout(|| {
47                trace!("initializing a buffer with capacity {}", capacity);
48                BufferMetadata::new()
49            })
50            .map(|c| {
51                let old_buffer_count = BUFFER_COUNT.fetch_add(1, Ordering::SeqCst);
52                gauge!("buffer.number", old_buffer_count + 1);
53                Checkout { inner: c }
54            })
55    }
56}
57
58impl ops::Deref for Pool {
59    type Target = poule::Pool<BufferMetadata>;
60
61    fn deref(&self) -> &Self::Target {
62        &self.inner
63    }
64}
65
66impl ops::DerefMut for Pool {
67    fn deref_mut(&mut self) -> &mut poule::Pool<BufferMetadata> {
68        &mut self.inner
69    }
70}
71
72#[derive(Debug, PartialEq, Eq, Clone)]
73pub struct BufferMetadata {
74    position: usize,
75    end: usize,
76}
77
78impl Default for BufferMetadata {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl BufferMetadata {
85    pub fn new() -> BufferMetadata {
86        BufferMetadata {
87            position: 0,
88            end: 0,
89        }
90    }
91}
92
93pub struct Checkout {
94    pub inner: poule::Checkout<BufferMetadata>,
95}
96
97/*
98impl ops::Deref for Checkout {
99    type Target = poule::Checkout<BufferMetadata>;
100
101    fn deref(&self) -> &Self::Target {
102        &self.inner
103    }
104}
105
106impl ops::DerefMut for Checkout {
107    fn deref_mut(&mut self) -> &mut poule::Checkout<BufferMetadata> {
108        &mut self.inner
109    }
110}
111*/
112
113impl Drop for Checkout {
114    fn drop(&mut self) {
115        let old_buffer_count = BUFFER_COUNT.fetch_sub(1, Ordering::SeqCst);
116        gauge!("buffer.number", old_buffer_count - 1);
117    }
118}
119
120impl Checkout {
121    pub fn available_data(&self) -> usize {
122        self.inner.end - self.inner.position
123    }
124
125    pub fn available_space(&self) -> usize {
126        self.capacity() - self.inner.end
127    }
128
129    pub fn capacity(&self) -> usize {
130        self.inner.extra().len()
131    }
132
133    pub fn empty(&self) -> bool {
134        self.inner.position == self.inner.end
135    }
136
137    pub fn consume(&mut self, count: usize) -> usize {
138        let cnt = cmp::min(count, self.available_data());
139        self.inner.position += cnt;
140        if self.inner.position > self.capacity() / 2 {
141            //trace!("consume shift: pos {}, end {}", self.position, self.end);
142            self.shift();
143        }
144        cnt
145    }
146
147    pub fn fill(&mut self, count: usize) -> usize {
148        let cnt = cmp::min(count, self.available_space());
149        self.inner.end += cnt;
150        if self.available_space() < self.available_data() + cnt {
151            //trace!("fill shift: pos {}, end {}", self.position, self.end);
152            self.shift();
153        }
154
155        cnt
156    }
157
158    pub fn reset(&mut self) {
159        self.inner.position = 0;
160        self.inner.end = 0;
161    }
162
163    pub fn sync(&mut self, end: usize, position: usize) {
164        self.inner.position = position;
165        self.inner.end = end;
166    }
167
168    pub fn data(&self) -> &[u8] {
169        &self.inner.extra()[self.inner.position..self.inner.end]
170    }
171
172    pub fn space(&mut self) -> &mut [u8] {
173        let range = self.inner.end..self.capacity();
174        &mut self.inner.extra_mut()[range]
175    }
176
177    pub fn shift(&mut self) {
178        let pos = self.inner.position;
179        let end = self.inner.end;
180        if pos > 0 {
181            unsafe {
182                let length = end - pos;
183                ptr::copy(
184                    self.inner.extra()[pos..end].as_ptr(),
185                    self.inner.extra_mut()[..length].as_mut_ptr(),
186                    length,
187                );
188                self.inner.position = 0;
189                self.inner.end = length;
190            }
191        }
192    }
193
194    pub fn delete_slice(&mut self, start: usize, length: usize) -> Option<usize> {
195        if start + length >= self.available_data() {
196            return None;
197        }
198
199        unsafe {
200            let begin = self.inner.position + start;
201            let next_end = self.inner.end - length;
202            ptr::copy(
203                self.inner.extra()[begin + length..self.inner.end].as_ptr(),
204                self.inner.extra_mut()[begin..next_end].as_mut_ptr(),
205                self.inner.end - (begin + length),
206            );
207            self.inner.end = next_end;
208        }
209        Some(self.available_data())
210    }
211
212    pub fn replace_slice(&mut self, data: &[u8], start: usize, length: usize) -> Option<usize> {
213        let data_len = data.len();
214        if start + length > self.available_data()
215            || self.inner.position + start + data_len > self.capacity()
216        {
217            return None;
218        }
219
220        unsafe {
221            let begin = self.inner.position + start;
222            let slice_end = begin + data_len;
223            // we reduced the data size
224            if data_len < length {
225                ptr::copy(
226                    data.as_ptr(),
227                    self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
228                    data_len,
229                );
230
231                ptr::copy(
232                    self.inner.extra()[start + length..self.inner.end].as_ptr(),
233                    self.inner.extra_mut()[slice_end..].as_mut_ptr(),
234                    self.inner.end - (start + length),
235                );
236                self.inner.end -= length - data_len;
237
238            // we put more data in the buffer
239            } else {
240                ptr::copy(
241                    self.inner.extra()[start + length..self.inner.end].as_ptr(),
242                    self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
243                    self.inner.end - (start + length),
244                );
245                ptr::copy(
246                    data.as_ptr(),
247                    self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
248                    data_len,
249                );
250                self.inner.end += data_len - length;
251            }
252        }
253        Some(self.available_data())
254    }
255
256    pub fn insert_slice(&mut self, data: &[u8], start: usize) -> Option<usize> {
257        let data_len = data.len();
258        if start > self.available_data()
259            || self.inner.position + self.inner.end + data_len > self.capacity()
260        {
261            return None;
262        }
263
264        unsafe {
265            let begin = self.inner.position + start;
266            let slice_end = begin + data_len;
267            ptr::copy(
268                self.inner.extra()[start..self.inner.end].as_ptr(),
269                self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
270                self.inner.end - start,
271            );
272            ptr::copy(
273                data.as_ptr(),
274                self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
275                data_len,
276            );
277            self.inner.end += data_len;
278        }
279        Some(self.available_data())
280    }
281}
282
283impl Write for Checkout {
284    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
285        match self.space().write(buf) {
286            Ok(size) => {
287                self.fill(size);
288                Ok(size)
289            }
290            err => err,
291        }
292    }
293
294    fn flush(&mut self) -> io::Result<()> {
295        Ok(())
296    }
297}
298
299impl Read for Checkout {
300    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
301        let len = cmp::min(self.available_data(), buf.len());
302        unsafe {
303            ptr::copy(
304                self.inner.extra()[self.inner.position..self.inner.position + len].as_ptr(),
305                buf.as_mut_ptr(),
306                len,
307            );
308            self.inner.position += len;
309        }
310        Ok(len)
311    }
312}