buffer_pool/
buffer.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::ops::Deref;
28use std::ops::DerefMut;
29
30use crate::buffer_pool::consume_buffer_total_bytes;
31use crate::Reuse;
32
33/// A convinience wrapper around Vec that allows to "consume" data from the
34/// front *without* shifting.
35///
36/// This is not unlike `VecDeque` but more ergonomic
37/// for the operations we require. Conceptually `VecDeque` is two slices, and
38/// this is one slice. Also there is no `set_len` for `VecDeque`, so it has to
39/// be converted to `Vec` and then back again.
40#[derive(Default, Debug)]
41pub struct ConsumeBuffer {
42    inner: Vec<u8>,
43    head: usize,
44}
45
46impl Deref for ConsumeBuffer {
47    type Target = [u8];
48
49    fn deref(&self) -> &Self::Target {
50        &self.inner[self.head..]
51    }
52}
53
54impl DerefMut for ConsumeBuffer {
55    fn deref_mut(&mut self) -> &mut Self::Target {
56        &mut self.inner[self.head..]
57    }
58}
59
60impl Reuse for ConsumeBuffer {
61    fn reuse(&mut self, val: usize) -> bool {
62        let old_capacity = self.inner.capacity();
63        self.inner.clear();
64        self.inner.shrink_to(val);
65        self.update_metrics_after_resize(old_capacity);
66        self.head = 0;
67        self.inner.capacity() > 0
68    }
69
70    fn capacity(&self) -> usize {
71        self.inner.capacity()
72    }
73}
74
75impl Drop for ConsumeBuffer {
76    fn drop(&mut self) {
77        consume_buffer_total_bytes().dec_by(self.inner.capacity() as u64);
78    }
79}
80
81impl ConsumeBuffer {
82    pub fn from_vec(inner: Vec<u8>) -> Self {
83        consume_buffer_total_bytes().inc_by(inner.capacity() as u64);
84        ConsumeBuffer { inner, head: 0 }
85    }
86
87    pub fn into_vec(mut self) -> Vec<u8> {
88        // As ConsumeBuffer has a Drop impl we have use `take` to get at the inner
89        // vec out instead of being able to move it out directly. This also means
90        // we need to update the metrics by hand as the buffer is no longer owned
91        // by us.
92        let mut inner = std::mem::take(&mut self.inner);
93        consume_buffer_total_bytes().dec_by(inner.capacity() as u64);
94        inner.drain(0..self.head);
95        inner
96    }
97
98    pub fn pop_front(&mut self, count: usize) {
99        assert!(self.head + count <= self.inner.len());
100        self.head += count;
101    }
102
103    pub fn expand(&mut self, count: usize) {
104        let old_capacity = self.inner.capacity();
105        self.inner.reserve_exact(count);
106        self.update_metrics_after_resize(old_capacity);
107        // SAFETY: u8 is always initialized and we reserved the capacity.
108        unsafe { self.inner.set_len(count) };
109    }
110
111    pub fn truncate(&mut self, count: usize) {
112        self.inner.truncate(self.head + count);
113    }
114
115    pub fn add_prefix(&mut self, prefix: &[u8]) -> bool {
116        if self.head < prefix.len() {
117            return false;
118        }
119
120        self.head -= prefix.len();
121        self.inner[self.head..self.head + prefix.len()].copy_from_slice(prefix);
122
123        true
124    }
125
126    fn update_metrics_after_resize(&mut self, old_capacity: usize) {
127        let new_capacity = self.inner.capacity();
128        if new_capacity < old_capacity {
129            consume_buffer_total_bytes()
130                .dec_by((old_capacity - new_capacity) as u64);
131        } else if new_capacity > old_capacity {
132            consume_buffer_total_bytes()
133                .inc_by((new_capacity - old_capacity) as u64);
134        }
135    }
136}
137
138impl<'a> Extend<&'a u8> for ConsumeBuffer {
139    fn extend<T: IntoIterator<Item = &'a u8>>(&mut self, iter: T) {
140        let old_capacity = self.inner.capacity();
141        self.inner.extend(iter);
142        self.update_metrics_after_resize(old_capacity);
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn test_metrics() {
152        {
153            let mut buf = ConsumeBuffer::default();
154            assert_eq!(buf.inner.capacity(), 0);
155            assert_eq!(consume_buffer_total_bytes().get(), 0);
156
157            buf.extend(&[0, 1, 2, 3, 4]);
158            assert_eq!(
159                consume_buffer_total_bytes().get(),
160                buf.inner.capacity() as u64
161            );
162        }
163        // drop buf
164        assert_eq!(consume_buffer_total_bytes().get(), 0);
165
166        let mut buf_a = ConsumeBuffer::from_vec(vec![0, 1, 2, 3, 4]);
167        let buf_b = ConsumeBuffer::from_vec(vec![5, 6, 7]);
168        assert_eq!(
169            consume_buffer_total_bytes().get(),
170            (buf_a.inner.capacity() + buf_b.inner.capacity()) as u64
171        );
172
173        buf_a.expand(100000);
174        assert_eq!(
175            consume_buffer_total_bytes().get(),
176            (buf_a.inner.capacity() + buf_b.inner.capacity()) as u64
177        );
178
179        buf_b.into_vec();
180        assert_eq!(
181            consume_buffer_total_bytes().get(),
182            buf_a.inner.capacity() as u64
183        );
184
185        drop(buf_a);
186        assert_eq!(consume_buffer_total_bytes().get(), 0);
187    }
188}