mountpoint_s3_fs/memory/
buffers.rs1use std::io::Read;
2use std::ops::{Deref, DerefMut};
3
4use bytes::Bytes;
5
6use crate::sync::Arc;
7
8use super::pages::PagedBufferPtr;
9use super::stats::{BufferKind, PoolStats};
10
11#[derive(Debug)]
17pub struct PoolBuffer(PoolBufferInner);
18
19#[derive(Debug)]
20enum PoolBufferInner {
21 Primary { buffer_ptr: PagedBufferPtr, size: usize },
23 Secondary(FreeBuffer),
25}
26
27impl PoolBuffer {
28 pub(super) fn new_primary(buffer_ptr: PagedBufferPtr, size: usize) -> Self {
29 assert!(size <= buffer_ptr.size());
30 Self(PoolBufferInner::Primary { buffer_ptr, size })
31 }
32
33 pub(super) fn new_secondary(size: usize, kind: BufferKind, stats: Arc<PoolStats>) -> Self {
34 Self(PoolBufferInner::Secondary(FreeBuffer::new(size, kind, stats)))
35 }
36
37 pub fn capacity(&self) -> usize {
38 match &self.0 {
39 PoolBufferInner::Primary { size, .. } => *size,
40 PoolBufferInner::Secondary(boxed) => boxed.data.len(),
41 }
42 }
43
44 pub fn into_bytes(self) -> Bytes {
45 Bytes::from_owner(self)
46 }
47}
48
49impl Deref for PoolBuffer {
50 type Target = [u8];
51
52 fn deref(&self) -> &Self::Target {
53 match &self.0 {
54 PoolBufferInner::Primary { buffer_ptr, size } => {
55 unsafe { std::slice::from_raw_parts(buffer_ptr.as_raw_ptr(), *size) }
57 }
58 PoolBufferInner::Secondary(boxed) => &boxed.data,
59 }
60 }
61}
62
63impl DerefMut for PoolBuffer {
64 fn deref_mut(&mut self) -> &mut Self::Target {
65 match &mut self.0 {
66 PoolBufferInner::Primary { buffer_ptr, size } => {
67 unsafe { std::slice::from_raw_parts_mut(buffer_ptr.as_raw_ptr(), *size) }
69 }
70 PoolBufferInner::Secondary(boxed) => &mut boxed.data,
71 }
72 }
73}
74
75impl AsMut<[u8]> for PoolBuffer {
76 fn as_mut(&mut self) -> &mut [u8] {
77 self
78 }
79}
80
81impl AsRef<[u8]> for PoolBuffer {
82 fn as_ref(&self) -> &[u8] {
83 self
84 }
85}
86
87#[derive(Debug)]
89pub struct PoolBufferMut {
90 buffer: PoolBuffer,
91 len: usize,
92}
93
94impl PoolBufferMut {
95 pub fn new(buffer: PoolBuffer) -> Self {
96 Self { buffer, len: 0 }
97 }
98
99 pub fn append_from_slice<'a>(&mut self, slice: &mut &'a [u8]) -> &'a [u8] {
104 let available = self.buffer.capacity() - self.len;
105 let overflow = slice.split_off(available.min(slice.len())..).unwrap();
106 let new_len = self.len + slice.len();
107 self.buffer.as_mut()[self.len..new_len].copy_from_slice(slice);
108 self.len = new_len;
109 overflow
110 }
111
112 pub fn fill_from_reader(&mut self, mut reader: impl Read) -> Result<(), std::io::Error> {
117 reader.read_exact(&mut self.buffer.as_mut()[self.len..])?;
118 self.len = self.buffer.capacity();
119 Ok(())
120 }
121
122 pub fn is_full(&self) -> bool {
123 self.len == self.buffer.capacity()
124 }
125
126 pub fn is_empty(&self) -> bool {
127 self.len == 0
128 }
129
130 pub fn len(&self) -> usize {
131 self.len
132 }
133
134 pub fn capacity(&self) -> usize {
135 self.buffer.capacity()
136 }
137
138 pub fn into_bytes(self) -> Bytes {
139 Bytes::from_owner(self)
140 }
141}
142
143impl Deref for PoolBufferMut {
144 type Target = [u8];
145
146 fn deref(&self) -> &Self::Target {
147 &self.buffer[..self.len]
148 }
149}
150
151impl DerefMut for PoolBufferMut {
152 fn deref_mut(&mut self) -> &mut Self::Target {
153 &mut self.buffer[..self.len]
154 }
155}
156
157impl AsRef<[u8]> for PoolBufferMut {
158 fn as_ref(&self) -> &[u8] {
159 self
160 }
161}
162
163impl AsMut<[u8]> for PoolBufferMut {
164 fn as_mut(&mut self) -> &mut [u8] {
165 self
166 }
167}
168
169#[derive(Debug)]
170struct FreeBuffer {
171 data: Box<[u8]>,
172 kind: BufferKind,
173 stats: Arc<PoolStats>,
174}
175
176impl FreeBuffer {
177 fn new(size: usize, kind: BufferKind, stats: Arc<PoolStats>) -> Self {
178 let data = vec![0u8; size].into_boxed_slice();
179 stats.reserve_bytes(data.len(), kind);
180 Self { data, kind, stats }
181 }
182}
183
184impl Drop for FreeBuffer {
185 fn drop(&mut self) {
186 self.stats.release_bytes(self.data.len(), self.kind);
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::super::pages::Page;
193
194 use super::*;
195
196 use test_case::{test_case, test_matrix};
197
198 fn primary(buffer_size: usize) -> PoolBuffer {
199 let page = Page::new_for_tests(buffer_size);
200 let buffer_ptr = page
201 .try_reserve(BufferKind::Other)
202 .expect("should be able to reserve a buffer from a new page");
203
204 PoolBuffer::new_primary(buffer_ptr, buffer_size)
205 }
206
207 fn secondary(buffer_size: usize) -> PoolBuffer {
208 PoolBuffer::new_secondary(buffer_size, BufferKind::Other, Arc::new(PoolStats::default()))
209 }
210
211 #[test_case(primary)]
212 #[test_case(secondary)]
213 fn test_pool_buffer(create_fn: fn(usize) -> PoolBuffer) {
214 const BUFFER_SIZE: usize = 1024;
215 let mut pool_buffer = create_fn(BUFFER_SIZE);
216
217 assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
218
219 let data = &[42u8; BUFFER_SIZE];
220 pool_buffer.copy_from_slice(data);
221
222 assert_eq!(pool_buffer.as_ref(), data);
223
224 let bytes = pool_buffer.into_bytes();
225 assert_eq!(bytes.as_ref(), data);
226 }
227
228 const BUFFER_SIZE: usize = 1024;
229 const SMALLER_THEN_BUFFER_SIZE: usize = 512;
230 const LARGER_THAN_BUFFER_SIZE: usize = 1280;
231
232 #[test_matrix([primary, secondary], [SMALLER_THEN_BUFFER_SIZE, BUFFER_SIZE, LARGER_THAN_BUFFER_SIZE])]
233 fn test_pool_buffer_mut_append(create_fn: fn(usize) -> PoolBuffer, write_size: usize) {
234 let mut pool_buffer = PoolBufferMut::new(create_fn(BUFFER_SIZE));
235
236 assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
237 assert_eq!(pool_buffer.len(), 0);
238 assert!(pool_buffer.is_empty());
239 assert!(!pool_buffer.is_full());
240
241 let buffer_as_slice: &[u8] = &pool_buffer;
242 assert!(buffer_as_slice.is_empty());
243
244 let data = vec![42u8; write_size];
245 let mut slice = &data[..];
246 let remaining = pool_buffer.append_from_slice(&mut slice);
247
248 let overflow_size = write_size.saturating_sub(BUFFER_SIZE);
249 let appended_size = write_size - overflow_size;
250 assert_eq!(remaining.len(), overflow_size);
251 assert_eq!(slice.len(), appended_size);
252
253 assert_eq!(pool_buffer.len(), appended_size);
254 assert_eq!(&pool_buffer[..], &data[..appended_size]);
255
256 let bytes = pool_buffer.into_bytes();
257 assert_eq!(&bytes[..], &data[..appended_size]);
258 }
259
260 #[test_matrix([primary, secondary], [SMALLER_THEN_BUFFER_SIZE, BUFFER_SIZE, LARGER_THAN_BUFFER_SIZE])]
261 fn test_pool_buffer_mut_fill(create_fn: fn(usize) -> PoolBuffer, read_size: usize) {
262 let mut pool_buffer = PoolBufferMut::new(create_fn(BUFFER_SIZE));
263
264 assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
265 assert_eq!(pool_buffer.len(), 0);
266 assert!(pool_buffer.is_empty());
267 assert!(!pool_buffer.is_full());
268
269 let data = vec![42u8; read_size];
270 let result = pool_buffer.fill_from_reader(&data[..]);
271 if read_size >= BUFFER_SIZE {
272 result.expect("fill from large enough slice should succeed");
273 assert_eq!(&pool_buffer[..], &data[..BUFFER_SIZE]);
274 } else {
275 result.expect_err("fill from small slice should fail");
276 assert!(pool_buffer.is_empty());
277 }
278 }
279
280 #[test_matrix([primary, secondary], [SMALLER_THEN_BUFFER_SIZE, BUFFER_SIZE, LARGER_THAN_BUFFER_SIZE])]
281 fn test_pool_buffer_mut_fill_non_empty(create_fn: fn(usize) -> PoolBuffer, read_size: usize) {
282 let mut pool_buffer = PoolBufferMut::new(create_fn(BUFFER_SIZE));
283
284 assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
285 assert_eq!(pool_buffer.len(), 0);
286 assert!(pool_buffer.is_empty());
287 assert!(!pool_buffer.is_full());
288
289 const INITIAL_SIZE: usize = 10;
290 let initial = [7u8; INITIAL_SIZE];
291 let overflow = pool_buffer.append_from_slice(&mut &initial[..]);
292 assert!(overflow.is_empty());
293 assert_eq!(pool_buffer.len(), INITIAL_SIZE);
294
295 let read_size = read_size.checked_sub(INITIAL_SIZE).unwrap();
296 let data = vec![42u8; read_size];
297 let result = pool_buffer.fill_from_reader(&data[..]);
298 if read_size + INITIAL_SIZE >= BUFFER_SIZE {
299 result.expect("fill from large enough slice should succeed");
300 assert_eq!(&pool_buffer[..INITIAL_SIZE], &initial[..]);
301 assert_eq!(&pool_buffer[INITIAL_SIZE..], &data[..BUFFER_SIZE - INITIAL_SIZE]);
302 } else {
303 result.expect_err("fill from small slice should fail");
304 assert_eq!(&pool_buffer[..], &initial[..]);
305 }
306 }
307}