mountpoint_s3_fs/memory/
buffers.rs

1use std::io::Read;
2use std::ops::{Deref, DerefMut};
3
4use bytes::Bytes;
5
6use crate::sync::Arc;
7
8use super::pages::PagedBufferPtr;
9use super::stats::{BufferKind, PoolStats};
10
11/// A buffer backed by the pool.
12///
13/// The memory for this buffer can be either part of a page (for "primary" buffers),
14/// or be a free allocation ("secondary" buffers), depending on the requested size
15/// and configuration of the [PagedPool](super::PagedPool).
16#[derive(Debug)]
17pub struct PoolBuffer(PoolBufferInner);
18
19#[derive(Debug)]
20enum PoolBufferInner {
21    /// Buffer from the paged pool.
22    Primary { buffer_ptr: PagedBufferPtr, size: usize },
23    /// Buffer allocated independently.
24    Secondary(FreeBuffer),
25}
26
27impl PoolBuffer {
28    pub(super) fn new_primary(buffer_ptr: PagedBufferPtr, size: usize) -> Self {
29        assert!(size <= buffer_ptr.size());
30        Self(PoolBufferInner::Primary { buffer_ptr, size })
31    }
32
33    pub(super) fn new_secondary(size: usize, kind: BufferKind, stats: Arc<PoolStats>) -> Self {
34        Self(PoolBufferInner::Secondary(FreeBuffer::new(size, kind, stats)))
35    }
36
37    pub fn capacity(&self) -> usize {
38        match &self.0 {
39            PoolBufferInner::Primary { size, .. } => *size,
40            PoolBufferInner::Secondary(boxed) => boxed.data.len(),
41        }
42    }
43
44    pub fn into_bytes(self) -> Bytes {
45        Bytes::from_owner(self)
46    }
47}
48
49impl Deref for PoolBuffer {
50    type Target = [u8];
51
52    fn deref(&self) -> &Self::Target {
53        match &self.0 {
54            PoolBufferInner::Primary { buffer_ptr, size } => {
55                // SAFETY: returned slice will be valid until this buffer is dropped.
56                unsafe { std::slice::from_raw_parts(buffer_ptr.as_raw_ptr(), *size) }
57            }
58            PoolBufferInner::Secondary(boxed) => &boxed.data,
59        }
60    }
61}
62
63impl DerefMut for PoolBuffer {
64    fn deref_mut(&mut self) -> &mut Self::Target {
65        match &mut self.0 {
66            PoolBufferInner::Primary { buffer_ptr, size } => {
67                // SAFETY: returned slice will be valid until this buffer is dropped.
68                unsafe { std::slice::from_raw_parts_mut(buffer_ptr.as_raw_ptr(), *size) }
69            }
70            PoolBufferInner::Secondary(boxed) => &mut boxed.data,
71        }
72    }
73}
74
75impl AsMut<[u8]> for PoolBuffer {
76    fn as_mut(&mut self) -> &mut [u8] {
77        self
78    }
79}
80
81impl AsRef<[u8]> for PoolBuffer {
82    fn as_ref(&self) -> &[u8] {
83        self
84    }
85}
86
87/// A mutable buffer backed by the pool.
88#[derive(Debug)]
89pub struct PoolBufferMut {
90    buffer: PoolBuffer,
91    len: usize,
92}
93
94impl PoolBufferMut {
95    pub fn new(buffer: PoolBuffer) -> Self {
96        Self { buffer, len: 0 }
97    }
98
99    /// Append data from a slice to the end of this buffer. If reaching the buffer capacity,
100    /// split off the overflowing subslice.
101    ///
102    /// Returns the overflowing subslice.
103    pub fn append_from_slice<'a>(&mut self, slice: &mut &'a [u8]) -> &'a [u8] {
104        let available = self.buffer.capacity() - self.len;
105        let overflow = slice.split_off(available.min(slice.len())..).unwrap();
106        let new_len = self.len + slice.len();
107        self.buffer.as_mut()[self.len..new_len].copy_from_slice(slice);
108        self.len = new_len;
109        overflow
110    }
111
112    /// Fill the remaining of this buffer capacity with data from the given reader.
113    ///
114    /// Will call [Read::read_exact] on `reader` and return an error if `reader` reaches
115    /// end-of-file before filling the buffer.
116    pub fn fill_from_reader(&mut self, mut reader: impl Read) -> Result<(), std::io::Error> {
117        reader.read_exact(&mut self.buffer.as_mut()[self.len..])?;
118        self.len = self.buffer.capacity();
119        Ok(())
120    }
121
122    pub fn is_full(&self) -> bool {
123        self.len == self.buffer.capacity()
124    }
125
126    pub fn is_empty(&self) -> bool {
127        self.len == 0
128    }
129
130    pub fn len(&self) -> usize {
131        self.len
132    }
133
134    pub fn capacity(&self) -> usize {
135        self.buffer.capacity()
136    }
137
138    pub fn into_bytes(self) -> Bytes {
139        Bytes::from_owner(self)
140    }
141}
142
143impl Deref for PoolBufferMut {
144    type Target = [u8];
145
146    fn deref(&self) -> &Self::Target {
147        &self.buffer[..self.len]
148    }
149}
150
151impl DerefMut for PoolBufferMut {
152    fn deref_mut(&mut self) -> &mut Self::Target {
153        &mut self.buffer[..self.len]
154    }
155}
156
157impl AsRef<[u8]> for PoolBufferMut {
158    fn as_ref(&self) -> &[u8] {
159        self
160    }
161}
162
163impl AsMut<[u8]> for PoolBufferMut {
164    fn as_mut(&mut self) -> &mut [u8] {
165        self
166    }
167}
168
169#[derive(Debug)]
170struct FreeBuffer {
171    data: Box<[u8]>,
172    kind: BufferKind,
173    stats: Arc<PoolStats>,
174}
175
176impl FreeBuffer {
177    fn new(size: usize, kind: BufferKind, stats: Arc<PoolStats>) -> Self {
178        let data = vec![0u8; size].into_boxed_slice();
179        stats.reserve_bytes(data.len(), kind);
180        Self { data, kind, stats }
181    }
182}
183
184impl Drop for FreeBuffer {
185    fn drop(&mut self) {
186        self.stats.release_bytes(self.data.len(), self.kind);
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::super::pages::Page;
193
194    use super::*;
195
196    use test_case::{test_case, test_matrix};
197
198    fn primary(buffer_size: usize) -> PoolBuffer {
199        let page = Page::new_for_tests(buffer_size);
200        let buffer_ptr = page
201            .try_reserve(BufferKind::Other)
202            .expect("should be able to reserve a buffer from a new page");
203
204        PoolBuffer::new_primary(buffer_ptr, buffer_size)
205    }
206
207    fn secondary(buffer_size: usize) -> PoolBuffer {
208        PoolBuffer::new_secondary(buffer_size, BufferKind::Other, Arc::new(PoolStats::default()))
209    }
210
211    #[test_case(primary)]
212    #[test_case(secondary)]
213    fn test_pool_buffer(create_fn: fn(usize) -> PoolBuffer) {
214        const BUFFER_SIZE: usize = 1024;
215        let mut pool_buffer = create_fn(BUFFER_SIZE);
216
217        assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
218
219        let data = &[42u8; BUFFER_SIZE];
220        pool_buffer.copy_from_slice(data);
221
222        assert_eq!(pool_buffer.as_ref(), data);
223
224        let bytes = pool_buffer.into_bytes();
225        assert_eq!(bytes.as_ref(), data);
226    }
227
228    const BUFFER_SIZE: usize = 1024;
229    const SMALLER_THEN_BUFFER_SIZE: usize = 512;
230    const LARGER_THAN_BUFFER_SIZE: usize = 1280;
231
232    #[test_matrix([primary, secondary], [SMALLER_THEN_BUFFER_SIZE, BUFFER_SIZE, LARGER_THAN_BUFFER_SIZE])]
233    fn test_pool_buffer_mut_append(create_fn: fn(usize) -> PoolBuffer, write_size: usize) {
234        let mut pool_buffer = PoolBufferMut::new(create_fn(BUFFER_SIZE));
235
236        assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
237        assert_eq!(pool_buffer.len(), 0);
238        assert!(pool_buffer.is_empty());
239        assert!(!pool_buffer.is_full());
240
241        let buffer_as_slice: &[u8] = &pool_buffer;
242        assert!(buffer_as_slice.is_empty());
243
244        let data = vec![42u8; write_size];
245        let mut slice = &data[..];
246        let remaining = pool_buffer.append_from_slice(&mut slice);
247
248        let overflow_size = write_size.saturating_sub(BUFFER_SIZE);
249        let appended_size = write_size - overflow_size;
250        assert_eq!(remaining.len(), overflow_size);
251        assert_eq!(slice.len(), appended_size);
252
253        assert_eq!(pool_buffer.len(), appended_size);
254        assert_eq!(&pool_buffer[..], &data[..appended_size]);
255
256        let bytes = pool_buffer.into_bytes();
257        assert_eq!(&bytes[..], &data[..appended_size]);
258    }
259
260    #[test_matrix([primary, secondary], [SMALLER_THEN_BUFFER_SIZE, BUFFER_SIZE, LARGER_THAN_BUFFER_SIZE])]
261    fn test_pool_buffer_mut_fill(create_fn: fn(usize) -> PoolBuffer, read_size: usize) {
262        let mut pool_buffer = PoolBufferMut::new(create_fn(BUFFER_SIZE));
263
264        assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
265        assert_eq!(pool_buffer.len(), 0);
266        assert!(pool_buffer.is_empty());
267        assert!(!pool_buffer.is_full());
268
269        let data = vec![42u8; read_size];
270        let result = pool_buffer.fill_from_reader(&data[..]);
271        if read_size >= BUFFER_SIZE {
272            result.expect("fill from large enough slice should succeed");
273            assert_eq!(&pool_buffer[..], &data[..BUFFER_SIZE]);
274        } else {
275            result.expect_err("fill from small slice should fail");
276            assert!(pool_buffer.is_empty());
277        }
278    }
279
280    #[test_matrix([primary, secondary], [SMALLER_THEN_BUFFER_SIZE, BUFFER_SIZE, LARGER_THAN_BUFFER_SIZE])]
281    fn test_pool_buffer_mut_fill_non_empty(create_fn: fn(usize) -> PoolBuffer, read_size: usize) {
282        let mut pool_buffer = PoolBufferMut::new(create_fn(BUFFER_SIZE));
283
284        assert_eq!(pool_buffer.capacity(), BUFFER_SIZE);
285        assert_eq!(pool_buffer.len(), 0);
286        assert!(pool_buffer.is_empty());
287        assert!(!pool_buffer.is_full());
288
289        const INITIAL_SIZE: usize = 10;
290        let initial = [7u8; INITIAL_SIZE];
291        let overflow = pool_buffer.append_from_slice(&mut &initial[..]);
292        assert!(overflow.is_empty());
293        assert_eq!(pool_buffer.len(), INITIAL_SIZE);
294
295        let read_size = read_size.checked_sub(INITIAL_SIZE).unwrap();
296        let data = vec![42u8; read_size];
297        let result = pool_buffer.fill_from_reader(&data[..]);
298        if read_size + INITIAL_SIZE >= BUFFER_SIZE {
299            result.expect("fill from large enough slice should succeed");
300            assert_eq!(&pool_buffer[..INITIAL_SIZE], &initial[..]);
301            assert_eq!(&pool_buffer[INITIAL_SIZE..], &data[..BUFFER_SIZE - INITIAL_SIZE]);
302        } else {
303            result.expect_err("fill from small slice should fail");
304            assert_eq!(&pool_buffer[..], &initial[..]);
305        }
306    }
307}