mountpoint_s3_fs/memory/
pool.rs

1use std::time::Duration;
2
3use mountpoint_s3_client::config::{MemoryPool, MetaRequestType};
4
5use crate::sync::{Arc, RwLock};
6
7use super::buffers::{PoolBuffer, PoolBufferMut};
8use super::pages::{Page, PagedBufferPtr};
9use super::stats::{BufferKind, PoolStats, SizePoolStats};
10
11/// A pool of reusable fixed-size buffers allocated in large pages.
12///
13/// This type implements [MemoryPool] and can be configured as a custom memory pool
14/// for a [S3CrtClient](mountpoint_s3_client::S3CrtClient).
15/// It can also return mutable buffers ([PoolBufferMut]) to use in other scenarios,
16/// such as reading cache blocks from disk or buffering incremental uploads.
17///
18/// The pool is configured by providing a set of sizes for the buffers, e.g. read
19/// and write part sizes, cache block size. For each of these sizes, the pool will
20/// maintain a [SizePool] instance, which will allocate "pages" of
21/// [BUFFERS_PER_PAGE](super::pages::Page::BUFFERS_PER_PAGE) (16) buffers on demand.
22///
23/// When a buffer of a given size is requested, the pool will return a buffer from one
24/// of 2 logical areas: **primary** and **secondary**:
25///
26/// - **primary**: the [SizePool] with the smallest buffer size large enough to contain
27///   the requested size is selected. If any of its existing pages has a free buffer,
28///   it is marked as reserved and returned, otherwise a new page is allocated.
29/// - **secondary**: if the requested size is larger than the maximum buffer size in
30///   [SizePool]s, a buffer of the exact size is allocated and returned.
31///
32/// When a primary buffer is dropped, it will automatically be released back to the pool,
33/// so it can be reused. Secondary buffers will also notify the pool on drop, but only
34/// for tracking.
35///
36/// The [`trim`](Self::trim) method can be invoked to free all empty pages, i.e. with no
37/// currently reserved buffers, across [SizePool]s.
38///
39/// When reserving a buffer, a [BufferKind] parameter is required to keep track of the
40/// usage. Requests through the [MemoryPool] interface will map the originating
41/// [MetaRequestType] to [BufferKind].
42#[derive(Debug, Clone)]
43pub struct PagedPool {
44    inner: Arc<PagedPoolInner>,
45}
46
47impl PagedPool {
48    /// Maximum size for a primary buffer.
49    ///
50    /// Buffers larger than this size will be allocated from secondary memory.
51    pub const MAX_BUFFER_SIZE: usize = 64 * 1024 * 1024;
52
53    /// Default size for a primary buffer.
54    ///
55    /// Used when no other valid buffer size is provided when creating the pool.
56    pub const DEFAULT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
57
58    /// Create a new pool, configuring primary memory with the given set of
59    /// buffer sizes, if valid.
60    ///
61    /// Ignores invalid (0 or greater than [MAX_BUFFER_SIZE](Self::MAX_BUFFER_SIZE))
62    /// or duplicate values for buffer sizes. If no valid value is provided,
63    /// uses [DEFAULT_BUFFER_SIZE](Self::DEFAULT_BUFFER_SIZE).
64    pub fn new_with_candidate_sizes(buffer_sizes: impl Into<Vec<usize>>) -> Self {
65        let mut ordered_sizes: Vec<_> = buffer_sizes.into();
66        ordered_sizes.retain(|&size| size > 0 && size <= Self::MAX_BUFFER_SIZE);
67        ordered_sizes.dedup();
68        ordered_sizes.sort();
69        if ordered_sizes.is_empty() {
70            ordered_sizes.push(Self::DEFAULT_BUFFER_SIZE);
71        }
72
73        let stats = Arc::new(PoolStats::default());
74        let ordered_size_pools = ordered_sizes
75            .into_iter()
76            .map(|buffer_size| SizePool {
77                pages: Default::default(),
78                stats: Arc::new(SizePoolStats::new(buffer_size, stats.clone())),
79            })
80            .collect();
81
82        let inner = PagedPoolInner {
83            ordered_size_pools,
84            stats,
85        };
86        Self { inner: Arc::new(inner) }
87    }
88
89    /// Trim empty pages in the pool.
90    pub fn trim(&self) -> bool {
91        self.inner.trim()
92    }
93
94    /// Schedule recurring calls to [trim](Self::trim).
95    pub fn schedule_trim(&self, recurring_time: Duration) {
96        let weak = Arc::downgrade(&self.inner);
97        std::thread::spawn(move || {
98            loop {
99                std::thread::sleep(recurring_time);
100                let Some(pool) = weak.upgrade() else {
101                    return;
102                };
103                pool.trim();
104            }
105        });
106    }
107
108    /// Return the reserved memory in bytes for the given kind of buffer.
109    pub fn reserved_bytes(&self, kind: BufferKind) -> usize {
110        self.inner.stats.reserved_bytes(kind)
111    }
112
113    /// Get a new empty mutable buffer from the pool with the requested capacity.
114    pub fn get_buffer_mut(&self, capacity: usize, kind: BufferKind) -> PoolBufferMut {
115        let buffer = self.get_buffer(capacity, kind);
116        PoolBufferMut::new(buffer)
117    }
118
119    fn get_buffer(&self, size: usize, kind: BufferKind) -> PoolBuffer {
120        match self.inner.get_pool_for_size(size) {
121            Some(pool) => {
122                let buffer_ptr = pool.reserve(kind);
123                metrics::histogram!("pool.reserved_bytes", "type" => "primary", "kind" => kind.as_str())
124                    .record(size as f64);
125                metrics::histogram!("pool.slack_bytes", "size" => format!("{}", pool.buffer_size()), "kind" => kind.as_str())
126                    .record((pool.buffer_size() - size) as f64);
127                PoolBuffer::new_primary(buffer_ptr, size)
128            }
129            None => {
130                metrics::histogram!("pool.reserved_bytes", "type" => "secondary", "kind" => kind.as_str())
131                    .record(size as f64);
132                PoolBuffer::new_secondary(size, kind, self.inner.stats.clone())
133            }
134        }
135    }
136
137    #[cfg(test)]
138    fn page_count(&self) -> usize {
139        self.inner
140            .ordered_size_pools
141            .iter()
142            .map(|pool| pool.pages.read().unwrap().len())
143            .sum()
144    }
145
146    #[cfg(test)]
147    fn reserved_buffer_count(&self, kind: BufferKind) -> usize {
148        self.inner
149            .ordered_size_pools
150            .iter()
151            .map(|pool| pool.stats.reserved_buffers(kind))
152            .sum()
153    }
154}
155
156impl MemoryPool for PagedPool {
157    type Buffer = PoolBuffer;
158
159    fn get_buffer(&self, size: usize, meta_request_type: MetaRequestType) -> Self::Buffer {
160        self.get_buffer(size, meta_request_type.into())
161    }
162
163    fn trim(&self) -> bool {
164        self.trim()
165    }
166}
167
168#[derive(Debug)]
169struct PagedPoolInner {
170    ordered_size_pools: Vec<SizePool>,
171    stats: Arc<PoolStats>,
172}
173
174impl PagedPoolInner {
175    fn get_pool_for_size(&self, size: usize) -> Option<&SizePool> {
176        if size == 0 {
177            return None;
178        }
179
180        let index = self.ordered_size_pools.partition_point(|p| p.stats.buffer_size < size);
181        if index == self.ordered_size_pools.len() {
182            return None;
183        }
184
185        Some(&self.ordered_size_pools[index])
186    }
187
188    fn trim(&self) -> bool {
189        let mut removed = false;
190        for pool in &self.ordered_size_pools {
191            if pool.stats.empty_pages() == 0 {
192                continue;
193            }
194            let mut write = pool.pages.write().unwrap();
195            let len = write.len();
196            write.retain(|p| !p.invalidate_if_empty());
197
198            let pages_freed = len - write.len();
199            if pages_freed > 0 {
200                tracing::trace!(
201                    size = pool.stats.buffer_size,
202                    pages_freed,
203                    "free empty memory pool pages"
204                );
205                removed = true;
206                metrics::gauge!("pool.allocated_pages", "size" => format!("{}", pool.stats.buffer_size))
207                    .decrement(pages_freed as f64);
208            }
209            metrics::histogram!("pool.trim_pages", "size" => format!("{}", pool.stats.buffer_size))
210                .record(pages_freed as f64);
211        }
212
213        removed
214    }
215}
216
217#[derive(Debug)]
218struct SizePool {
219    pages: RwLock<Vec<Page>>,
220    stats: Arc<SizePoolStats>,
221}
222
223impl SizePool {
224    fn reserve(&self, kind: BufferKind) -> PagedBufferPtr {
225        {
226            // Fast path: reserve a buffer from the existing pages (under a read lock).
227            let read_pages = self.pages.read().unwrap();
228            if let Some(buffer_ptr) = self.try_get_buffer_ptr(read_pages.iter(), kind) {
229                return buffer_ptr;
230            }
231        }
232
233        // Slow path: we could not find an available buffer on the first round, so we need
234        // a write lock to be able to add a page. But first, we check the existing pages again
235        // in case a buffer became available while we waited for the lock or another concurrent
236        // reserve already added a new page.
237        let mut write_pages = self.pages.write().unwrap();
238        if let Some(buffer_ptr) = self.try_get_buffer_ptr(write_pages.iter(), kind) {
239            return buffer_ptr;
240        }
241
242        tracing::trace!(size = self.stats.buffer_size, "allocate new memory pool page");
243        let page = Page::new(self.stats.clone());
244        let buffer_ptr = page.try_reserve(kind).unwrap();
245        write_pages.push(page);
246        buffer_ptr
247    }
248
249    fn try_get_buffer_ptr<'a>(
250        &self,
251        mut pages: impl Iterator<Item = &'a Page>,
252        kind: BufferKind,
253    ) -> Option<PagedBufferPtr> {
254        pages.find_map(|page| page.try_reserve(kind))
255    }
256
257    fn buffer_size(&self) -> usize {
258        self.stats.buffer_size
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use std::collections::HashMap;
265    use std::ops::Deref;
266    use std::thread::{self, sleep};
267    use std::time::Duration;
268
269    use super::*;
270
271    use bytes::Bytes;
272    use rand::Rng;
273    use test_case::{test_case, test_matrix};
274
275    fn copy_from_slice(pool: &PagedPool, original: &[u8]) -> Bytes {
276        let mut buffer = pool.get_buffer(original.len(), BufferKind::Other);
277        buffer.as_mut().clone_from_slice(original);
278        buffer.into_bytes()
279    }
280
281    #[test_case(&[1, 2, 3], &[5, 10])]
282    #[test_case(&vec![42u8; 1000], &[128, 1024])]
283    fn test_from_slice(original: &[u8], buffer_sizes: &[usize]) {
284        let pool = PagedPool::new_with_candidate_sizes(buffer_sizes);
285        let bytes = copy_from_slice(&pool, original);
286        assert_eq!(original, bytes.as_ref());
287    }
288
289    #[test_case(&[5, 10, 1024])]
290    fn test_pages(buffer_sizes: &[usize]) {
291        let pool = PagedPool::new_with_candidate_sizes(buffer_sizes);
292
293        for &size in buffer_sizes {
294            let original = vec![1u8; size];
295
296            assert_eq!(pool.page_count(), 0);
297            assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 0);
298
299            let mut buffers = Vec::new();
300            for _ in 0..16 {
301                buffers.push(copy_from_slice(&pool, &original));
302            }
303            assert_eq!(pool.page_count(), 1);
304            assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 16);
305
306            buffers.push(copy_from_slice(&pool, &original));
307            assert_eq!(pool.page_count(), 2);
308            assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 17);
309
310            assert!(!pool.trim());
311
312            drop(buffers);
313
314            assert_eq!(pool.page_count(), 2);
315            assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 0);
316
317            assert!(pool.trim());
318            assert_eq!(pool.page_count(), 0);
319            assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 0);
320        }
321    }
322
323    #[test_matrix(&[1, 2, 3, 4, 5, 6, 7], &[5, 10], [None, Some(Duration::from_millis(1))])]
324    #[test_matrix(&vec![42u8; 1000], &[128, 1024], [None, Some(Duration::from_millis(10))])]
325    #[test_matrix(&vec![42u8; 10000], &[128, 1024, 2024, 8192], [None, Some(Duration::from_millis(10))])]
326    fn stress_test(original: &[u8], buffer_sizes: &[usize], schedule: Option<Duration>) {
327        let pool = PagedPool::new_with_candidate_sizes(buffer_sizes);
328        if let Some(duration) = schedule {
329            pool.schedule_trim(duration);
330        }
331
332        let num_threads = 10000;
333        thread::scope(|scope| {
334            for i in 0..num_threads {
335                let pool = pool.clone();
336                scope.spawn(move || {
337                    let len = rand::rng().random_range(1..original.len());
338                    let original = &original[..len];
339                    let bytes = copy_from_slice(&pool, &original[..len]);
340                    assert_eq!(original, bytes.deref());
341
342                    sleep(Duration::from_millis(i as u64 % 10));
343
344                    let bytes = copy_from_slice(&pool, &bytes);
345                    assert_eq!(original, bytes.deref());
346                });
347            }
348        });
349
350        let page_count = pool.page_count();
351        if page_count > 0 {
352            pool.trim();
353            assert_eq!(pool.page_count(), 0);
354        }
355    }
356
357    #[test]
358    fn test_reserved_bytes() {
359        let buffer_size = 1024;
360        let reservations = HashMap::from([(BufferKind::GetObject, 10), (BufferKind::Other, 20)]);
361        let pool = PagedPool::new_with_candidate_sizes([buffer_size]);
362        let mut buffers = Vec::new();
363        for (&kind, &count) in &reservations {
364            for _ in 0..count {
365                buffers.push(pool.get_buffer(buffer_size, kind).into_bytes());
366            }
367        }
368
369        for (&kind, &count) in &reservations {
370            let reserved = pool.reserved_bytes(kind);
371            assert_eq!(reserved, count * buffer_size);
372        }
373    }
374}