1use std::collections::HashMap;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::{Arc, Mutex};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum PoolClass {
29 Small, Medium, Large, Huge, Oversize, }
35
36impl PoolClass {
37 pub fn capacity(self) -> usize {
38 match self {
39 PoolClass::Small => 4 * 1024,
40 PoolClass::Medium => 64 * 1024,
41 PoolClass::Large => 1024 * 1024,
42 PoolClass::Huge => 10 * 1024 * 1024,
43 PoolClass::Oversize => usize::MAX,
44 }
45 }
46
47 pub fn slug(self) -> &'static str {
48 match self {
49 PoolClass::Small => "small",
50 PoolClass::Medium => "medium",
51 PoolClass::Large => "large",
52 PoolClass::Huge => "huge",
53 PoolClass::Oversize => "oversize",
54 }
55 }
56
57 pub fn for_size(requested: usize) -> Self {
59 if requested <= PoolClass::Small.capacity() {
60 PoolClass::Small
61 } else if requested <= PoolClass::Medium.capacity() {
62 PoolClass::Medium
63 } else if requested <= PoolClass::Large.capacity() {
64 PoolClass::Large
65 } else if requested <= PoolClass::Huge.capacity() {
66 PoolClass::Huge
67 } else {
68 PoolClass::Oversize
69 }
70 }
71
72 pub fn non_oversize() -> [PoolClass; 4] {
73 [
74 PoolClass::Small,
75 PoolClass::Medium,
76 PoolClass::Large,
77 PoolClass::Huge,
78 ]
79 }
80}
81
82#[derive(Debug, Default)]
85struct PoolMetrics {
86 pool_hits: [AtomicU64; 4],
87 pool_misses: [AtomicU64; 4],
88 oversize_allocations: AtomicU64,
89 live_bytes: AtomicU64,
90}
91
92impl PoolMetrics {
93 fn record_request(&self, class: PoolClass, hit: bool) {
94 if class == PoolClass::Oversize {
95 self.oversize_allocations.fetch_add(1, Ordering::Relaxed);
96 return;
97 }
98 let idx = match class {
99 PoolClass::Small => 0,
100 PoolClass::Medium => 1,
101 PoolClass::Large => 2,
102 PoolClass::Huge => 3,
103 PoolClass::Oversize => unreachable!(),
104 };
105 if hit {
106 self.pool_hits[idx].fetch_add(1, Ordering::Relaxed);
107 } else {
108 self.pool_misses[idx].fetch_add(1, Ordering::Relaxed);
109 }
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct BufferPoolSnapshot {
117 pub pool_hits: HashMap<PoolClass, u64>,
118 pub pool_misses: HashMap<PoolClass, u64>,
119 pub oversize_allocations_total: u64,
120 pub live_bytes: u64,
121 pub tenant_live_bytes: HashMap<String, u64>,
122 pub tenant_soft_limit_exceeded_total: HashMap<String, u64>,
123}
124
125#[derive(Debug)]
128struct TenantAccount {
129 soft_limit_bytes: u64,
130 live_bytes: u64,
131 soft_limit_exceeded_total: u64,
132}
133
134impl TenantAccount {
135 fn new(soft_limit_bytes: u64) -> Self {
136 TenantAccount {
137 soft_limit_bytes,
138 live_bytes: 0,
139 soft_limit_exceeded_total: 0,
140 }
141 }
142}
143
144pub struct BufferPool {
150 free_lists: [Mutex<Vec<Vec<u8>>>; 4],
153 metrics: PoolMetrics,
154 tenants: Mutex<HashMap<Arc<str>, TenantAccount>>,
155 default_tenant_soft_limit_bytes: u64,
156}
157
158impl BufferPool {
159 pub fn new(default_tenant_soft_limit_bytes: u64) -> Self {
161 BufferPool {
162 free_lists: [
163 Mutex::new(Vec::new()),
164 Mutex::new(Vec::new()),
165 Mutex::new(Vec::new()),
166 Mutex::new(Vec::new()),
167 ],
168 metrics: PoolMetrics::default(),
169 tenants: Mutex::new(HashMap::new()),
170 default_tenant_soft_limit_bytes,
171 }
172 }
173
174 pub fn set_tenant_soft_limit(
177 &self,
178 tenant_id: impl Into<Arc<str>>,
179 soft_limit_bytes: u64,
180 ) {
181 let arc: Arc<str> = tenant_id.into();
182 let mut guard = self.tenants.lock().expect("tenant map poisoned");
183 guard
184 .entry(arc)
185 .or_insert_with(|| TenantAccount::new(soft_limit_bytes))
186 .soft_limit_bytes = soft_limit_bytes;
187 }
188
189 pub fn acquire(&self, requested: usize) -> (Vec<u8>, PoolClass) {
194 let class = PoolClass::for_size(requested);
195 if class == PoolClass::Oversize {
196 self.metrics.record_request(class, false);
197 return (Vec::with_capacity(requested), class);
198 }
199 let idx = class_index(class);
200 let mut free = self.free_lists[idx]
201 .lock()
202 .expect("free list poisoned");
203 if let Some(mut slab) = free.pop() {
204 self.metrics.record_request(class, true);
205 slab.clear();
206 (slab, class)
207 } else {
208 self.metrics.record_request(class, false);
209 (Vec::with_capacity(class.capacity()), class)
210 }
211 }
212
213 pub fn release(&self, mut slab: Vec<u8>, class: PoolClass) {
218 if class == PoolClass::Oversize {
219 drop(slab);
221 return;
222 }
223 let idx = class_index(class);
224 slab.clear();
225 let mut free = self.free_lists[idx]
226 .lock()
227 .expect("free list poisoned");
228 if free.len() < 64 {
232 free.push(slab);
233 } else {
234 drop(slab);
235 }
236 }
237
238 pub fn record_tenant_allocation(
241 &self,
242 tenant_id: impl Into<Arc<str>>,
243 bytes: u64,
244 ) {
245 let arc: Arc<str> = tenant_id.into();
246 let default_limit = self.default_tenant_soft_limit_bytes;
247 let mut guard = self.tenants.lock().expect("tenant map poisoned");
248 let entry = guard
249 .entry(arc)
250 .or_insert_with(|| TenantAccount::new(default_limit));
251 entry.live_bytes = entry.live_bytes.saturating_add(bytes);
252 if entry.live_bytes > entry.soft_limit_bytes {
253 entry.soft_limit_exceeded_total += 1;
254 }
255 drop(guard);
256 self.metrics
257 .live_bytes
258 .fetch_add(bytes, Ordering::Relaxed);
259 }
260
261 pub fn record_tenant_release(
264 &self,
265 tenant_id: impl Into<Arc<str>>,
266 bytes: u64,
267 ) {
268 let arc: Arc<str> = tenant_id.into();
269 let mut guard = self.tenants.lock().expect("tenant map poisoned");
270 if let Some(entry) = guard.get_mut(&arc) {
271 entry.live_bytes = entry.live_bytes.saturating_sub(bytes);
272 }
273 drop(guard);
274 self.metrics
275 .live_bytes
276 .fetch_sub(bytes.min(self.metrics.live_bytes.load(Ordering::Relaxed)), Ordering::Relaxed);
277 }
278
279 pub fn snapshot(&self) -> BufferPoolSnapshot {
281 let mut pool_hits = HashMap::new();
282 let mut pool_misses = HashMap::new();
283 for class in PoolClass::non_oversize() {
284 let idx = class_index(class);
285 pool_hits.insert(
286 class,
287 self.metrics.pool_hits[idx].load(Ordering::Relaxed),
288 );
289 pool_misses.insert(
290 class,
291 self.metrics.pool_misses[idx].load(Ordering::Relaxed),
292 );
293 }
294 let guard = self.tenants.lock().expect("tenant map poisoned");
295 let tenant_live_bytes: HashMap<String, u64> = guard
296 .iter()
297 .map(|(k, v)| (k.to_string(), v.live_bytes))
298 .collect();
299 let tenant_soft_limit_exceeded_total: HashMap<String, u64> = guard
300 .iter()
301 .map(|(k, v)| (k.to_string(), v.soft_limit_exceeded_total))
302 .collect();
303 BufferPoolSnapshot {
304 pool_hits,
305 pool_misses,
306 oversize_allocations_total: self
307 .metrics
308 .oversize_allocations
309 .load(Ordering::Relaxed),
310 live_bytes: self.metrics.live_bytes.load(Ordering::Relaxed),
311 tenant_live_bytes,
312 tenant_soft_limit_exceeded_total,
313 }
314 }
315}
316
317impl Default for BufferPool {
318 fn default() -> Self {
319 BufferPool::new(256 * 1024 * 1024)
323 }
324}
325
326fn class_index(class: PoolClass) -> usize {
327 match class {
328 PoolClass::Small => 0,
329 PoolClass::Medium => 1,
330 PoolClass::Large => 2,
331 PoolClass::Huge => 3,
332 PoolClass::Oversize => panic!("oversize class has no free list"),
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn size_classes_map_correctly() {
342 assert_eq!(PoolClass::for_size(1), PoolClass::Small);
343 assert_eq!(PoolClass::for_size(4 * 1024), PoolClass::Small);
344 assert_eq!(
345 PoolClass::for_size(4 * 1024 + 1),
346 PoolClass::Medium
347 );
348 assert_eq!(PoolClass::for_size(64 * 1024), PoolClass::Medium);
349 assert_eq!(
350 PoolClass::for_size(64 * 1024 + 1),
351 PoolClass::Large
352 );
353 assert_eq!(PoolClass::for_size(1024 * 1024), PoolClass::Large);
354 assert_eq!(
355 PoolClass::for_size(1024 * 1024 + 1),
356 PoolClass::Huge
357 );
358 assert_eq!(
359 PoolClass::for_size(10 * 1024 * 1024),
360 PoolClass::Huge
361 );
362 assert_eq!(
363 PoolClass::for_size(10 * 1024 * 1024 + 1),
364 PoolClass::Oversize
365 );
366 }
367
368 #[test]
369 fn acquire_returns_slab_with_class_capacity() {
370 let pool = BufferPool::default();
371 let (slab, class) = pool.acquire(8 * 1024);
372 assert_eq!(class, PoolClass::Medium);
373 assert_eq!(slab.len(), 0);
374 assert!(slab.capacity() >= PoolClass::Medium.capacity());
375 }
376
377 #[test]
378 fn release_reuses_slab() {
379 let pool = BufferPool::default();
380 let (mut slab, class) = pool.acquire(1000);
381 slab.extend_from_slice(&[42u8; 1000]);
382 pool.release(slab, class);
383
384 let (slab2, class2) = pool.acquire(1000);
385 assert_eq!(class, class2);
386 assert!(slab2.is_empty());
388 let snap = pool.snapshot();
389 assert_eq!(snap.pool_hits[&PoolClass::Small], 1);
390 }
391
392 #[test]
393 fn oversize_bypasses_pool() {
394 let pool = BufferPool::default();
395 let huge = 50 * 1024 * 1024;
396 let (slab, class) = pool.acquire(huge);
397 assert_eq!(class, PoolClass::Oversize);
398 assert!(slab.capacity() >= huge);
399 let snap = pool.snapshot();
400 assert_eq!(snap.oversize_allocations_total, 1);
401 }
402
403 #[test]
404 fn tenant_soft_limit_exceeded_increments() {
405 let pool = BufferPool::new(1024); pool.record_tenant_allocation("alpha", 2048); pool.record_tenant_allocation("alpha", 512); let snap = pool.snapshot();
409 assert_eq!(
410 snap.tenant_soft_limit_exceeded_total["alpha"],
411 2
412 );
413 assert_eq!(snap.tenant_live_bytes["alpha"], 2560);
414 }
415
416 #[test]
417 fn per_tenant_override_applies() {
418 let pool = BufferPool::new(1024);
419 pool.set_tenant_soft_limit("premium", 10 * 1024);
420 pool.record_tenant_allocation("premium", 5 * 1024);
421 let snap = pool.snapshot();
422 assert_eq!(
424 snap.tenant_soft_limit_exceeded_total["premium"],
425 0
426 );
427 }
428
429 #[test]
430 fn free_list_caps_at_64() {
431 let pool = BufferPool::default();
432 for _ in 0..100 {
433 let (slab, class) = pool.acquire(1000);
434 pool.release(slab, class);
435 }
436 let (_slab, class) = pool.acquire(1000);
439 assert_eq!(class, PoolClass::Small);
440 }
441}