1use std::time::Duration;
2
3use mountpoint_s3_client::config::{MemoryPool, MetaRequestType};
4
5use crate::sync::{Arc, RwLock};
6
7use super::buffers::{PoolBuffer, PoolBufferMut};
8use super::pages::{Page, PagedBufferPtr};
9use super::stats::{BufferKind, PoolStats, SizePoolStats};
10
11#[derive(Debug, Clone)]
43pub struct PagedPool {
44 inner: Arc<PagedPoolInner>,
45}
46
47impl PagedPool {
48 pub const MAX_BUFFER_SIZE: usize = 64 * 1024 * 1024;
52
53 pub const DEFAULT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
57
58 pub fn new_with_candidate_sizes(buffer_sizes: impl Into<Vec<usize>>) -> Self {
65 let mut ordered_sizes: Vec<_> = buffer_sizes.into();
66 ordered_sizes.retain(|&size| size > 0 && size <= Self::MAX_BUFFER_SIZE);
67 ordered_sizes.dedup();
68 ordered_sizes.sort();
69 if ordered_sizes.is_empty() {
70 ordered_sizes.push(Self::DEFAULT_BUFFER_SIZE);
71 }
72
73 let stats = Arc::new(PoolStats::default());
74 let ordered_size_pools = ordered_sizes
75 .into_iter()
76 .map(|buffer_size| SizePool {
77 pages: Default::default(),
78 stats: Arc::new(SizePoolStats::new(buffer_size, stats.clone())),
79 })
80 .collect();
81
82 let inner = PagedPoolInner {
83 ordered_size_pools,
84 stats,
85 };
86 Self { inner: Arc::new(inner) }
87 }
88
89 pub fn trim(&self) -> bool {
91 self.inner.trim()
92 }
93
94 pub fn schedule_trim(&self, recurring_time: Duration) {
96 let weak = Arc::downgrade(&self.inner);
97 std::thread::spawn(move || {
98 loop {
99 std::thread::sleep(recurring_time);
100 let Some(pool) = weak.upgrade() else {
101 return;
102 };
103 pool.trim();
104 }
105 });
106 }
107
108 pub fn reserved_bytes(&self, kind: BufferKind) -> usize {
110 self.inner.stats.reserved_bytes(kind)
111 }
112
113 pub fn get_buffer_mut(&self, capacity: usize, kind: BufferKind) -> PoolBufferMut {
115 let buffer = self.get_buffer(capacity, kind);
116 PoolBufferMut::new(buffer)
117 }
118
119 fn get_buffer(&self, size: usize, kind: BufferKind) -> PoolBuffer {
120 match self.inner.get_pool_for_size(size) {
121 Some(pool) => {
122 let buffer_ptr = pool.reserve(kind);
123 metrics::histogram!("pool.reserved_bytes", "type" => "primary", "kind" => kind.as_str())
124 .record(size as f64);
125 metrics::histogram!("pool.slack_bytes", "size" => format!("{}", pool.buffer_size()), "kind" => kind.as_str())
126 .record((pool.buffer_size() - size) as f64);
127 PoolBuffer::new_primary(buffer_ptr, size)
128 }
129 None => {
130 metrics::histogram!("pool.reserved_bytes", "type" => "secondary", "kind" => kind.as_str())
131 .record(size as f64);
132 PoolBuffer::new_secondary(size, kind, self.inner.stats.clone())
133 }
134 }
135 }
136
137 #[cfg(test)]
138 fn page_count(&self) -> usize {
139 self.inner
140 .ordered_size_pools
141 .iter()
142 .map(|pool| pool.pages.read().unwrap().len())
143 .sum()
144 }
145
146 #[cfg(test)]
147 fn reserved_buffer_count(&self, kind: BufferKind) -> usize {
148 self.inner
149 .ordered_size_pools
150 .iter()
151 .map(|pool| pool.stats.reserved_buffers(kind))
152 .sum()
153 }
154}
155
156impl MemoryPool for PagedPool {
157 type Buffer = PoolBuffer;
158
159 fn get_buffer(&self, size: usize, meta_request_type: MetaRequestType) -> Self::Buffer {
160 self.get_buffer(size, meta_request_type.into())
161 }
162
163 fn trim(&self) -> bool {
164 self.trim()
165 }
166}
167
168#[derive(Debug)]
169struct PagedPoolInner {
170 ordered_size_pools: Vec<SizePool>,
171 stats: Arc<PoolStats>,
172}
173
174impl PagedPoolInner {
175 fn get_pool_for_size(&self, size: usize) -> Option<&SizePool> {
176 if size == 0 {
177 return None;
178 }
179
180 let index = self.ordered_size_pools.partition_point(|p| p.stats.buffer_size < size);
181 if index == self.ordered_size_pools.len() {
182 return None;
183 }
184
185 Some(&self.ordered_size_pools[index])
186 }
187
188 fn trim(&self) -> bool {
189 let mut removed = false;
190 for pool in &self.ordered_size_pools {
191 if pool.stats.empty_pages() == 0 {
192 continue;
193 }
194 let mut write = pool.pages.write().unwrap();
195 let len = write.len();
196 write.retain(|p| !p.invalidate_if_empty());
197
198 let pages_freed = len - write.len();
199 if pages_freed > 0 {
200 tracing::trace!(
201 size = pool.stats.buffer_size,
202 pages_freed,
203 "free empty memory pool pages"
204 );
205 removed = true;
206 metrics::gauge!("pool.allocated_pages", "size" => format!("{}", pool.stats.buffer_size))
207 .decrement(pages_freed as f64);
208 }
209 metrics::histogram!("pool.trim_pages", "size" => format!("{}", pool.stats.buffer_size))
210 .record(pages_freed as f64);
211 }
212
213 removed
214 }
215}
216
217#[derive(Debug)]
218struct SizePool {
219 pages: RwLock<Vec<Page>>,
220 stats: Arc<SizePoolStats>,
221}
222
223impl SizePool {
224 fn reserve(&self, kind: BufferKind) -> PagedBufferPtr {
225 {
226 let read_pages = self.pages.read().unwrap();
228 if let Some(buffer_ptr) = self.try_get_buffer_ptr(read_pages.iter(), kind) {
229 return buffer_ptr;
230 }
231 }
232
233 let mut write_pages = self.pages.write().unwrap();
238 if let Some(buffer_ptr) = self.try_get_buffer_ptr(write_pages.iter(), kind) {
239 return buffer_ptr;
240 }
241
242 tracing::trace!(size = self.stats.buffer_size, "allocate new memory pool page");
243 let page = Page::new(self.stats.clone());
244 let buffer_ptr = page.try_reserve(kind).unwrap();
245 write_pages.push(page);
246 buffer_ptr
247 }
248
249 fn try_get_buffer_ptr<'a>(
250 &self,
251 mut pages: impl Iterator<Item = &'a Page>,
252 kind: BufferKind,
253 ) -> Option<PagedBufferPtr> {
254 pages.find_map(|page| page.try_reserve(kind))
255 }
256
257 fn buffer_size(&self) -> usize {
258 self.stats.buffer_size
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use std::collections::HashMap;
265 use std::ops::Deref;
266 use std::thread::{self, sleep};
267 use std::time::Duration;
268
269 use super::*;
270
271 use bytes::Bytes;
272 use rand::Rng;
273 use test_case::{test_case, test_matrix};
274
275 fn copy_from_slice(pool: &PagedPool, original: &[u8]) -> Bytes {
276 let mut buffer = pool.get_buffer(original.len(), BufferKind::Other);
277 buffer.as_mut().clone_from_slice(original);
278 buffer.into_bytes()
279 }
280
281 #[test_case(&[1, 2, 3], &[5, 10])]
282 #[test_case(&vec![42u8; 1000], &[128, 1024])]
283 fn test_from_slice(original: &[u8], buffer_sizes: &[usize]) {
284 let pool = PagedPool::new_with_candidate_sizes(buffer_sizes);
285 let bytes = copy_from_slice(&pool, original);
286 assert_eq!(original, bytes.as_ref());
287 }
288
289 #[test_case(&[5, 10, 1024])]
290 fn test_pages(buffer_sizes: &[usize]) {
291 let pool = PagedPool::new_with_candidate_sizes(buffer_sizes);
292
293 for &size in buffer_sizes {
294 let original = vec![1u8; size];
295
296 assert_eq!(pool.page_count(), 0);
297 assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 0);
298
299 let mut buffers = Vec::new();
300 for _ in 0..16 {
301 buffers.push(copy_from_slice(&pool, &original));
302 }
303 assert_eq!(pool.page_count(), 1);
304 assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 16);
305
306 buffers.push(copy_from_slice(&pool, &original));
307 assert_eq!(pool.page_count(), 2);
308 assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 17);
309
310 assert!(!pool.trim());
311
312 drop(buffers);
313
314 assert_eq!(pool.page_count(), 2);
315 assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 0);
316
317 assert!(pool.trim());
318 assert_eq!(pool.page_count(), 0);
319 assert_eq!(pool.reserved_buffer_count(BufferKind::Other), 0);
320 }
321 }
322
323 #[test_matrix(&[1, 2, 3, 4, 5, 6, 7], &[5, 10], [None, Some(Duration::from_millis(1))])]
324 #[test_matrix(&vec![42u8; 1000], &[128, 1024], [None, Some(Duration::from_millis(10))])]
325 #[test_matrix(&vec![42u8; 10000], &[128, 1024, 2024, 8192], [None, Some(Duration::from_millis(10))])]
326 fn stress_test(original: &[u8], buffer_sizes: &[usize], schedule: Option<Duration>) {
327 let pool = PagedPool::new_with_candidate_sizes(buffer_sizes);
328 if let Some(duration) = schedule {
329 pool.schedule_trim(duration);
330 }
331
332 let num_threads = 10000;
333 thread::scope(|scope| {
334 for i in 0..num_threads {
335 let pool = pool.clone();
336 scope.spawn(move || {
337 let len = rand::rng().random_range(1..original.len());
338 let original = &original[..len];
339 let bytes = copy_from_slice(&pool, &original[..len]);
340 assert_eq!(original, bytes.deref());
341
342 sleep(Duration::from_millis(i as u64 % 10));
343
344 let bytes = copy_from_slice(&pool, &bytes);
345 assert_eq!(original, bytes.deref());
346 });
347 }
348 });
349
350 let page_count = pool.page_count();
351 if page_count > 0 {
352 pool.trim();
353 assert_eq!(pool.page_count(), 0);
354 }
355 }
356
357 #[test]
358 fn test_reserved_bytes() {
359 let buffer_size = 1024;
360 let reservations = HashMap::from([(BufferKind::GetObject, 10), (BufferKind::Other, 20)]);
361 let pool = PagedPool::new_with_candidate_sizes([buffer_size]);
362 let mut buffers = Vec::new();
363 for (&kind, &count) in &reservations {
364 for _ in 0..count {
365 buffers.push(pool.get_buffer(buffer_size, kind).into_bytes());
366 }
367 }
368
369 for (&kind, &count) in &reservations {
370 let reserved = pool.reserved_bytes(kind);
371 assert_eq!(reserved, count * buffer_size);
372 }
373 }
374}