1use std::ops::Deref;
28use std::ops::DerefMut;
29
30use crate::buffer_pool::consume_buffer_total_bytes;
31use crate::Reuse;
32
33#[derive(Default, Debug)]
41pub struct ConsumeBuffer {
42 inner: Vec<u8>,
43 head: usize,
44}
45
46impl Deref for ConsumeBuffer {
47 type Target = [u8];
48
49 fn deref(&self) -> &Self::Target {
50 &self.inner[self.head..]
51 }
52}
53
54impl DerefMut for ConsumeBuffer {
55 fn deref_mut(&mut self) -> &mut Self::Target {
56 &mut self.inner[self.head..]
57 }
58}
59
60impl Reuse for ConsumeBuffer {
61 fn reuse(&mut self, val: usize) -> bool {
62 let old_capacity = self.inner.capacity();
63 self.inner.clear();
64 self.inner.shrink_to(val);
65 self.update_metrics_after_resize(old_capacity);
66 self.head = 0;
67 self.inner.capacity() > 0
68 }
69
70 fn capacity(&self) -> usize {
71 self.inner.capacity()
72 }
73}
74
75impl Drop for ConsumeBuffer {
76 fn drop(&mut self) {
77 consume_buffer_total_bytes().dec_by(self.inner.capacity() as u64);
78 }
79}
80
81impl ConsumeBuffer {
82 pub fn from_vec(inner: Vec<u8>) -> Self {
83 consume_buffer_total_bytes().inc_by(inner.capacity() as u64);
84 ConsumeBuffer { inner, head: 0 }
85 }
86
87 pub fn into_vec(mut self) -> Vec<u8> {
88 let mut inner = std::mem::take(&mut self.inner);
93 consume_buffer_total_bytes().dec_by(inner.capacity() as u64);
94 inner.drain(0..self.head);
95 inner
96 }
97
98 pub fn pop_front(&mut self, count: usize) {
99 assert!(self.head + count <= self.inner.len());
100 self.head += count;
101 }
102
103 pub fn expand(&mut self, count: usize) {
104 let old_capacity = self.inner.capacity();
105 self.inner.reserve_exact(count);
106 self.update_metrics_after_resize(old_capacity);
107 unsafe { self.inner.set_len(count) };
109 }
110
111 pub fn truncate(&mut self, count: usize) {
112 self.inner.truncate(self.head + count);
113 }
114
115 pub fn add_prefix(&mut self, prefix: &[u8]) -> bool {
116 if self.head < prefix.len() {
117 return false;
118 }
119
120 self.head -= prefix.len();
121 self.inner[self.head..self.head + prefix.len()].copy_from_slice(prefix);
122
123 true
124 }
125
126 fn update_metrics_after_resize(&mut self, old_capacity: usize) {
127 let new_capacity = self.inner.capacity();
128 if new_capacity < old_capacity {
129 consume_buffer_total_bytes()
130 .dec_by((old_capacity - new_capacity) as u64);
131 } else if new_capacity > old_capacity {
132 consume_buffer_total_bytes()
133 .inc_by((new_capacity - old_capacity) as u64);
134 }
135 }
136}
137
138impl<'a> Extend<&'a u8> for ConsumeBuffer {
139 fn extend<T: IntoIterator<Item = &'a u8>>(&mut self, iter: T) {
140 let old_capacity = self.inner.capacity();
141 self.inner.extend(iter);
142 self.update_metrics_after_resize(old_capacity);
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn test_metrics() {
152 {
153 let mut buf = ConsumeBuffer::default();
154 assert_eq!(buf.inner.capacity(), 0);
155 assert_eq!(consume_buffer_total_bytes().get(), 0);
156
157 buf.extend(&[0, 1, 2, 3, 4]);
158 assert_eq!(
159 consume_buffer_total_bytes().get(),
160 buf.inner.capacity() as u64
161 );
162 }
163 assert_eq!(consume_buffer_total_bytes().get(), 0);
165
166 let mut buf_a = ConsumeBuffer::from_vec(vec![0, 1, 2, 3, 4]);
167 let buf_b = ConsumeBuffer::from_vec(vec![5, 6, 7]);
168 assert_eq!(
169 consume_buffer_total_bytes().get(),
170 (buf_a.inner.capacity() + buf_b.inner.capacity()) as u64
171 );
172
173 buf_a.expand(100000);
174 assert_eq!(
175 consume_buffer_total_bytes().get(),
176 (buf_a.inner.capacity() + buf_b.inner.capacity()) as u64
177 );
178
179 buf_b.into_vec();
180 assert_eq!(
181 consume_buffer_total_bytes().get(),
182 buf_a.inner.capacity() as u64
183 );
184
185 drop(buf_a);
186 assert_eq!(consume_buffer_total_bytes().get(), 0);
187 }
188}