Skip to main content

sozu_lib/
pool.rs

1/// experimental module to measure buffer pool usage
2///
3/// this allows us to track how many buffers are used through the
4/// buffers.count metric.
5///
6/// Right now, we wrap the `pool` crate, but we might write a different
7/// buffer pool in the future, so this module will still be useful to
8/// test the differences
9use std::{
10    cmp,
11    io::{self, Read, Write},
12    ops, ptr,
13    sync::atomic::{AtomicUsize, Ordering},
14};
15
16use crate::metrics::names;
17
18static BUFFER_COUNT: AtomicUsize = AtomicUsize::new(0);
19
20pub struct Pool {
21    pub inner: poule::Pool<BufferMetadata>,
22    pub buffer_size: usize,
23}
24
25impl Pool {
26    pub fn with_capacity(minimum: usize, maximum: usize, buffer_size: usize) -> Pool {
27        let mut inner = poule::Pool::with_extra(maximum, buffer_size);
28        inner.grow_to(minimum);
29        Pool { inner, buffer_size }
30    }
31
32    pub fn checkout(&mut self) -> Option<Checkout> {
33        if self.inner.used() == self.inner.capacity()
34            && self.inner.capacity() < self.inner.maximum_capacity()
35        {
36            self.inner.grow_to(std::cmp::min(
37                self.inner.capacity() * 2,
38                self.inner.maximum_capacity(),
39            ));
40            debug!(
41                "growing pool capacity from {} to {}",
42                self.inner.capacity(),
43                std::cmp::min(self.inner.capacity() * 2, self.inner.maximum_capacity())
44            );
45        }
46        let capacity = self.buffer_size;
47        self.inner
48            .checkout(|| {
49                trace!("initializing a buffer with capacity {}", capacity);
50                BufferMetadata::new()
51            })
52            .map(|c| {
53                let old_buffer_count = BUFFER_COUNT.fetch_add(1, Ordering::SeqCst);
54                gauge!(names::buffer::IN_USE, old_buffer_count + 1);
55                Checkout { inner: c }
56            })
57    }
58}
59
60impl ops::Deref for Pool {
61    type Target = poule::Pool<BufferMetadata>;
62
63    fn deref(&self) -> &Self::Target {
64        &self.inner
65    }
66}
67
68impl ops::DerefMut for Pool {
69    fn deref_mut(&mut self) -> &mut poule::Pool<BufferMetadata> {
70        &mut self.inner
71    }
72}
73
74#[derive(Debug, PartialEq, Eq, Clone)]
75pub struct BufferMetadata {
76    position: usize,
77    end: usize,
78}
79
80impl Default for BufferMetadata {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl BufferMetadata {
87    pub fn new() -> BufferMetadata {
88        BufferMetadata {
89            position: 0,
90            end: 0,
91        }
92    }
93}
94
95pub struct Checkout {
96    pub inner: poule::Checkout<BufferMetadata>,
97}
98
99/*
100impl ops::Deref for Checkout {
101    type Target = poule::Checkout<BufferMetadata>;
102
103    fn deref(&self) -> &Self::Target {
104        &self.inner
105    }
106}
107
108impl ops::DerefMut for Checkout {
109    fn deref_mut(&mut self) -> &mut poule::Checkout<BufferMetadata> {
110        &mut self.inner
111    }
112}
113*/
114
115impl Drop for Checkout {
116    fn drop(&mut self) {
117        let old_buffer_count = BUFFER_COUNT.fetch_sub(1, Ordering::SeqCst);
118        gauge!(names::buffer::IN_USE, old_buffer_count - 1);
119    }
120}
121
122impl Checkout {
123    pub fn available_data(&self) -> usize {
124        self.inner.end - self.inner.position
125    }
126
127    pub fn available_space(&self) -> usize {
128        self.capacity() - self.inner.end
129    }
130
131    pub fn capacity(&self) -> usize {
132        self.inner.extra().len()
133    }
134
135    pub fn empty(&self) -> bool {
136        self.inner.position == self.inner.end
137    }
138
139    pub fn consume(&mut self, count: usize) -> usize {
140        let cnt = cmp::min(count, self.available_data());
141        self.inner.position += cnt;
142        if self.inner.position > self.capacity() / 2 {
143            //trace!("consume shift: pos {}, end {}", self.position, self.end);
144            self.shift();
145        }
146        cnt
147    }
148
149    pub fn fill(&mut self, count: usize) -> usize {
150        let cnt = cmp::min(count, self.available_space());
151        self.inner.end += cnt;
152        if self.available_space() < self.available_data() + cnt {
153            //trace!("fill shift: pos {}, end {}", self.position, self.end);
154            self.shift();
155        }
156
157        cnt
158    }
159
160    pub fn reset(&mut self) {
161        self.inner.position = 0;
162        self.inner.end = 0;
163    }
164
165    pub fn sync(&mut self, end: usize, position: usize) {
166        self.inner.position = position;
167        self.inner.end = end;
168    }
169
170    pub fn data(&self) -> &[u8] {
171        &self.inner.extra()[self.inner.position..self.inner.end]
172    }
173
174    pub fn space(&mut self) -> &mut [u8] {
175        let range = self.inner.end..self.capacity();
176        &mut self.inner.extra_mut()[range]
177    }
178
179    pub fn shift(&mut self) {
180        let pos = self.inner.position;
181        let end = self.inner.end;
182        if pos > 0 {
183            // SAFETY: src and dst point into the same checkout buffer
184            // (`self.inner.extra`); the slice indexing above bounds-checks
185            // both ranges (`pos..end` and `..length`) against the live
186            // buffer length. `ptr::copy` is overlap-safe.
187            unsafe {
188                let length = end - pos;
189                ptr::copy(
190                    self.inner.extra()[pos..end].as_ptr(),
191                    self.inner.extra_mut()[..length].as_mut_ptr(),
192                    length,
193                );
194                self.inner.position = 0;
195                self.inner.end = length;
196            }
197        }
198    }
199
200    pub fn delete_slice(&mut self, start: usize, length: usize) -> Option<usize> {
201        if start + length >= self.available_data() {
202            return None;
203        }
204
205        // SAFETY: src and dst point into the same checkout buffer
206        // (`self.inner.extra`). The early-return above guarantees
207        // `start + length < available_data`, and slice indexing
208        // bounds-checks both `begin+length..end` and `begin..next_end`
209        // against the live buffer length. `ptr::copy` is overlap-safe.
210        unsafe {
211            let begin = self.inner.position + start;
212            let next_end = self.inner.end - length;
213            ptr::copy(
214                self.inner.extra()[begin + length..self.inner.end].as_ptr(),
215                self.inner.extra_mut()[begin..next_end].as_mut_ptr(),
216                self.inner.end - (begin + length),
217            );
218            self.inner.end = next_end;
219        }
220        Some(self.available_data())
221    }
222
223    pub fn replace_slice(&mut self, data: &[u8], start: usize, length: usize) -> Option<usize> {
224        let data_len = data.len();
225        if start + length > self.available_data()
226            || self.inner.position + start + data_len > self.capacity()
227        {
228            return None;
229        }
230
231        // SAFETY: every `ptr::copy` below moves bytes inside the same
232        // checkout buffer (`self.inner.extra`) or copies from the caller's
233        // `data` slice into it. The two early-return checks above bound
234        // the affected ranges against `available_data()` and `capacity()`,
235        // and each slice indexing site is bounds-checked. `ptr::copy` is
236        // overlap-safe.
237        unsafe {
238            let begin = self.inner.position + start;
239            let slice_end = begin + data_len;
240            // we reduced the data size
241            if data_len < length {
242                ptr::copy(
243                    data.as_ptr(),
244                    self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
245                    data_len,
246                );
247
248                ptr::copy(
249                    self.inner.extra()[start + length..self.inner.end].as_ptr(),
250                    self.inner.extra_mut()[slice_end..].as_mut_ptr(),
251                    self.inner.end - (start + length),
252                );
253                self.inner.end -= length - data_len;
254
255            // we put more data in the buffer
256            } else {
257                ptr::copy(
258                    self.inner.extra()[start + length..self.inner.end].as_ptr(),
259                    self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
260                    self.inner.end - (start + length),
261                );
262                ptr::copy(
263                    data.as_ptr(),
264                    self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
265                    data_len,
266                );
267                self.inner.end += data_len - length;
268            }
269        }
270        Some(self.available_data())
271    }
272
273    pub fn insert_slice(&mut self, data: &[u8], start: usize) -> Option<usize> {
274        let data_len = data.len();
275        if start > self.available_data()
276            || self.inner.position + self.inner.end + data_len > self.capacity()
277        {
278            return None;
279        }
280
281        // SAFETY: both `ptr::copy` calls touch `self.inner.extra` (same
282        // allocation) or copy from `data` into it. The early-return checks
283        // bound `start <= available_data` and the resulting tail
284        // `position + end + data_len <= capacity`, and each slice indexing
285        // site is bounds-checked. `ptr::copy` is overlap-safe.
286        unsafe {
287            let begin = self.inner.position + start;
288            let slice_end = begin + data_len;
289            ptr::copy(
290                self.inner.extra()[start..self.inner.end].as_ptr(),
291                self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
292                self.inner.end - start,
293            );
294            ptr::copy(
295                data.as_ptr(),
296                self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
297                data_len,
298            );
299            self.inner.end += data_len;
300        }
301        Some(self.available_data())
302    }
303}
304
305impl Write for Checkout {
306    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
307        match self.space().write(buf) {
308            Ok(size) => {
309                self.fill(size);
310                Ok(size)
311            }
312            err => err,
313        }
314    }
315
316    fn flush(&mut self) -> io::Result<()> {
317        Ok(())
318    }
319}
320
321impl Read for Checkout {
322    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
323        let len = cmp::min(self.available_data(), buf.len());
324        // SAFETY: `len = min(available_data, buf.len())`, so the source
325        // range `position..position+len` lies inside `self.inner.extra` and
326        // the destination `buf[..len]` fits the caller's `&mut [u8]`. The
327        // two slices are in different allocations; `ptr::copy` is
328        // overlap-safe regardless.
329        unsafe {
330            ptr::copy(
331                self.inner.extra()[self.inner.position..self.inner.position + len].as_ptr(),
332                buf.as_mut_ptr(),
333                len,
334            );
335            self.inner.position += len;
336        }
337        Ok(len)
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use std::io::{Read, Write};
345
346    /// Helper: create a Pool and return it directly.
347    fn create_test_pool(buffer_size: usize, max_count: usize) -> Pool {
348        Pool::with_capacity(max_count, max_count, buffer_size)
349    }
350
351    /// Helper: checkout a buffer and write initial content into it.
352    fn checkout_with_data(pool: &mut Pool, data: &[u8]) -> Checkout {
353        let mut buf = pool.checkout().expect("checkout should succeed");
354        let n = buf.write(data).expect("write should succeed");
355        assert_eq!(n, data.len(), "all bytes should be written");
356        buf
357    }
358
359    // -----------------------------------------------------------------------
360    // Pool checkout / checkin lifecycle
361    // -----------------------------------------------------------------------
362
363    #[test]
364    fn test_pool_checkout_returns_buffer() {
365        let mut pool = create_test_pool(1024, 2);
366        let buf = pool.checkout();
367        assert!(
368            buf.is_some(),
369            "first checkout from a fresh pool must succeed"
370        );
371        let buf = buf.unwrap();
372        assert_eq!(buf.capacity(), 1024);
373        assert_eq!(buf.available_data(), 0);
374        assert_eq!(buf.available_space(), 1024);
375    }
376
377    #[test]
378    fn test_pool_checkin_on_drop() {
379        let mut pool = create_test_pool(128, 1);
380        {
381            let _buf = pool.checkout().expect("checkout should succeed");
382            assert_eq!(pool.inner.used(), 1);
383        }
384        assert_eq!(pool.inner.used(), 0);
385        let buf2 = pool.checkout();
386        assert!(buf2.is_some(), "checkout after checkin should succeed");
387    }
388
389    #[test]
390    fn test_pool_auto_grow() {
391        let mut pool = Pool::with_capacity(1, 4, 256);
392        let _b1 = pool.checkout().expect("first checkout");
393        let _b2 = pool.checkout().expect("second checkout triggers growth");
394        let _b3 = pool.checkout().expect("third checkout");
395    }
396
397    // -----------------------------------------------------------------------
398    // Write / Read trait impls
399    // -----------------------------------------------------------------------
400
401    #[test]
402    fn test_checkout_write_and_read_data() {
403        let mut pool = create_test_pool(1024, 2);
404        let mut buf = pool.checkout().unwrap();
405
406        let payload = b"hello world";
407        let written = buf.write(payload).unwrap();
408        assert_eq!(written, payload.len());
409        assert_eq!(buf.available_data(), payload.len());
410        assert_eq!(buf.data(), payload);
411    }
412
413    #[test]
414    fn test_checkout_read_trait() {
415        let mut pool = create_test_pool(1024, 2);
416        let mut buf = checkout_with_data(&mut pool, b"hello");
417
418        let mut out = [0u8; 5];
419        let n = buf.read(&mut out).unwrap();
420        assert_eq!(n, 5);
421        assert_eq!(&out, b"hello");
422    }
423
424    #[test]
425    fn test_consume_and_fill() {
426        let mut pool = create_test_pool(1024, 2);
427        let mut buf = checkout_with_data(&mut pool, b"abcdefghij");
428
429        let consumed = buf.consume(3);
430        assert_eq!(consumed, 3);
431        assert_eq!(buf.data(), b"defghij");
432        assert_eq!(buf.available_data(), 7);
433
434        let filled = buf.fill(0);
435        assert_eq!(filled, 0);
436    }
437
438    #[test]
439    fn test_empty() {
440        let mut pool = create_test_pool(64, 2);
441        let buf = pool.checkout().unwrap();
442        assert!(buf.empty(), "freshly checked-out buffer should be empty");
443    }
444
445    #[test]
446    fn test_reset() {
447        let mut pool = create_test_pool(1024, 2);
448        let mut buf = checkout_with_data(&mut pool, b"data");
449        assert!(!buf.empty());
450
451        buf.reset();
452        assert!(buf.empty());
453        assert_eq!(buf.available_data(), 0);
454    }
455
456    #[test]
457    fn test_sync() {
458        let mut pool = create_test_pool(1024, 2);
459        let mut buf = checkout_with_data(&mut pool, b"hello world");
460        buf.sync(5, 2);
461        assert_eq!(buf.available_data(), 3);
462    }
463
464    // -----------------------------------------------------------------------
465    // shift() -- unsafe ptr::copy
466    // -----------------------------------------------------------------------
467
468    #[test]
469    fn test_shift_moves_data_to_start() {
470        let mut pool = create_test_pool(1024, 2);
471        let mut buf = checkout_with_data(&mut pool, b"hello world");
472
473        buf.inner.position = 5;
474        assert_eq!(buf.data(), b" world");
475
476        buf.shift();
477        assert_eq!(buf.inner.position, 0);
478        assert_eq!(buf.inner.end, 6);
479        assert_eq!(buf.data(), b" world");
480    }
481
482    #[test]
483    fn test_shift_noop_when_position_zero() {
484        let mut pool = create_test_pool(1024, 2);
485        let mut buf = checkout_with_data(&mut pool, b"hello");
486
487        assert_eq!(buf.inner.position, 0);
488        buf.shift();
489        assert_eq!(buf.data(), b"hello");
490        assert_eq!(buf.inner.position, 0);
491        assert_eq!(buf.inner.end, 5);
492    }
493
494    #[test]
495    fn test_consume_triggers_auto_shift() {
496        let mut pool = create_test_pool(256, 2);
497        let mut buf = pool.checkout().unwrap();
498        let capacity = buf.capacity();
499
500        let fill_count = capacity / 2 + 2;
501        let data: Vec<u8> = (0..fill_count as u8).collect();
502        let written = buf.write(&data).unwrap();
503        assert_eq!(written, fill_count);
504
505        let consume_count = capacity / 2 + 1;
506        buf.consume(consume_count);
507
508        assert_eq!(buf.inner.position, 0);
509        let remaining = fill_count - consume_count;
510        assert_eq!(buf.available_data(), remaining);
511    }
512
513    // -----------------------------------------------------------------------
514    // delete_slice() -- unsafe ptr::copy
515    // -----------------------------------------------------------------------
516
517    #[test]
518    fn test_delete_slice_middle() {
519        let mut pool = create_test_pool(1024, 2);
520        let mut buf = checkout_with_data(&mut pool, b"hello world!");
521
522        let result = buf.delete_slice(3, 5);
523        assert!(result.is_some());
524        assert_eq!(buf.data(), b"helrld!");
525    }
526
527    #[test]
528    fn test_delete_slice_from_start() {
529        let mut pool = create_test_pool(1024, 2);
530        let mut buf = checkout_with_data(&mut pool, b"hello world!");
531
532        let result = buf.delete_slice(0, 3);
533        assert!(result.is_some());
534        assert_eq!(buf.data(), b"lo world!");
535    }
536
537    #[test]
538    fn test_delete_slice_near_end() {
539        let mut pool = create_test_pool(1024, 2);
540        let mut buf = checkout_with_data(&mut pool, b"hello world!");
541
542        let result = buf.delete_slice(7, 4);
543        assert!(result.is_some());
544        assert_eq!(buf.data(), b"hello w!");
545    }
546
547    #[test]
548    fn test_delete_slice_out_of_bounds_returns_none() {
549        let mut pool = create_test_pool(1024, 2);
550        let mut buf = checkout_with_data(&mut pool, b"hello");
551
552        let result = buf.delete_slice(0, 5);
553        assert!(result.is_none());
554
555        let result = buf.delete_slice(3, 5);
556        assert!(result.is_none());
557    }
558
559    #[test]
560    fn test_delete_slice_single_byte() {
561        let mut pool = create_test_pool(1024, 2);
562        let mut buf = checkout_with_data(&mut pool, b"abcd");
563
564        let result = buf.delete_slice(1, 1);
565        assert!(result.is_some());
566        assert_eq!(buf.data(), b"acd");
567    }
568
569    // -----------------------------------------------------------------------
570    // replace_slice() -- unsafe ptr::copy
571    // -----------------------------------------------------------------------
572
573    #[test]
574    fn test_replace_slice_same_size() {
575        let mut pool = create_test_pool(1024, 2);
576        let mut buf = checkout_with_data(&mut pool, b"hello world");
577
578        let result = buf.replace_slice(b"earth", 6, 5);
579        assert!(result.is_some());
580        assert_eq!(buf.data(), b"hello earth");
581    }
582
583    #[test]
584    fn test_replace_slice_shrink() {
585        let mut pool = create_test_pool(1024, 2);
586        let mut buf = checkout_with_data(&mut pool, b"hello world");
587
588        let result = buf.replace_slice(b"hi", 6, 5);
589        assert!(result.is_some());
590        assert_eq!(buf.data(), b"hello hi");
591    }
592
593    #[test]
594    fn test_replace_slice_grow() {
595        let mut pool = create_test_pool(1024, 2);
596        let mut buf = checkout_with_data(&mut pool, b"hello world");
597
598        let result = buf.replace_slice(b"universe", 6, 5);
599        assert!(result.is_some());
600        assert_eq!(buf.data(), b"hello universe");
601    }
602
603    #[test]
604    fn test_replace_slice_at_start() {
605        let mut pool = create_test_pool(1024, 2);
606        let mut buf = checkout_with_data(&mut pool, b"hello world");
607
608        let result = buf.replace_slice(b"hey", 0, 5);
609        assert!(result.is_some());
610        assert_eq!(buf.data(), b"hey world");
611    }
612
613    #[test]
614    fn test_replace_slice_out_of_bounds_returns_none() {
615        let mut pool = create_test_pool(1024, 2);
616        let mut buf = checkout_with_data(&mut pool, b"hello");
617
618        let result = buf.replace_slice(b"x", 4, 5);
619        assert!(result.is_none());
620    }
621
622    #[test]
623    fn test_replace_slice_exceeds_data_bounds_returns_none() {
624        let mut pool = create_test_pool(256, 2);
625        let mut buf = checkout_with_data(&mut pool, b"hello");
626
627        let result = buf.replace_slice(b"xyz", 3, 5);
628        assert!(result.is_none());
629    }
630
631    #[test]
632    fn test_replace_slice_replacement_exceeds_capacity_returns_none() {
633        let mut pool = create_test_pool(256, 2);
634        let mut buf = pool.checkout().unwrap();
635        let capacity = buf.capacity();
636
637        let data = vec![b'x'; capacity];
638        let written = buf.write(&data).unwrap();
639        assert_eq!(written, capacity);
640
641        buf.inner.position = capacity - 2;
642        assert_eq!(buf.available_data(), 2);
643
644        // position + start + data_len = (capacity-2) + 0 + 5 = capacity+3 > capacity
645        let result = buf.replace_slice(b"abcde", 0, 1);
646        assert!(result.is_none());
647    }
648
649    // -----------------------------------------------------------------------
650    // insert_slice() -- unsafe ptr::copy
651    // -----------------------------------------------------------------------
652
653    #[test]
654    fn test_insert_slice_at_start() {
655        let mut pool = create_test_pool(1024, 2);
656        let mut buf = checkout_with_data(&mut pool, b"world");
657
658        let result = buf.insert_slice(b"hello ", 0);
659        assert!(result.is_some());
660        assert_eq!(buf.data(), b"hello world");
661    }
662
663    #[test]
664    fn test_insert_slice_in_middle() {
665        let mut pool = create_test_pool(1024, 2);
666        let mut buf = checkout_with_data(&mut pool, b"helo");
667
668        let result = buf.insert_slice(b"l", 2);
669        assert!(result.is_some());
670        assert_eq!(buf.data(), b"hello");
671    }
672
673    #[test]
674    fn test_insert_slice_at_end() {
675        let mut pool = create_test_pool(1024, 2);
676        let mut buf = checkout_with_data(&mut pool, b"hello");
677
678        let result = buf.insert_slice(b" world", 5);
679        assert!(result.is_some());
680        assert_eq!(buf.data(), b"hello world");
681    }
682
683    #[test]
684    fn test_insert_slice_exceeds_capacity_returns_none() {
685        let mut pool = create_test_pool(256, 2);
686        let mut buf = pool.checkout().unwrap();
687        let capacity = buf.capacity();
688
689        let data = vec![b'x'; capacity];
690        let written = buf.write(&data).unwrap();
691        assert_eq!(written, capacity);
692        assert_eq!(buf.available_space(), 0);
693
694        let result = buf.insert_slice(b"y", 0);
695        assert!(result.is_none());
696    }
697
698    #[test]
699    fn test_insert_slice_beyond_data_returns_none() {
700        let mut pool = create_test_pool(1024, 2);
701        let mut buf = checkout_with_data(&mut pool, b"hello");
702
703        let result = buf.insert_slice(b"x", 6);
704        assert!(result.is_none());
705    }
706
707    // -----------------------------------------------------------------------
708    // Combined operations -- exercise multiple unsafe paths together
709    // -----------------------------------------------------------------------
710
711    #[test]
712    fn test_write_consume_shift_write_again() {
713        let mut pool = create_test_pool(32, 2);
714        let mut buf = checkout_with_data(&mut pool, b"first");
715
716        buf.consume(5);
717        assert_eq!(buf.available_data(), 0);
718
719        let n = buf.write(b"second").unwrap();
720        assert_eq!(n, 6);
721        assert_eq!(buf.data(), b"second");
722    }
723
724    #[test]
725    fn test_delete_then_insert() {
726        let mut pool = create_test_pool(1024, 2);
727        let mut buf = checkout_with_data(&mut pool, b"hello cruel world");
728
729        buf.delete_slice(6, 6);
730        assert_eq!(buf.data(), b"hello world");
731
732        buf.insert_slice(b"beautiful ", 6);
733        assert_eq!(buf.data(), b"hello beautiful world");
734    }
735
736    #[test]
737    fn test_multiple_replace_operations() {
738        let mut pool = create_test_pool(1024, 2);
739        let mut buf = checkout_with_data(&mut pool, b"aXbXc");
740
741        buf.replace_slice(b"12", 1, 1);
742        assert_eq!(buf.data(), b"a12bXc");
743
744        buf.replace_slice(b"34", 4, 1);
745        assert_eq!(buf.data(), b"a12b34c");
746    }
747}