Skip to main content

axon/buffer/
pool.rs

1//! [`BufferPool`] — slab allocator for [`ZeroCopyBuffer`] storage.
2//!
3//! Allocation classes: 4 KiB, 64 KiB, 1 MiB, 10 MiB. Buffers larger
4//! than the largest class are direct-allocated (counted in the
5//! `oversize_allocations_total` metric).
6//!
7//! Per-tenant accounting: every tenant carries a soft byte limit.
8//! Exceeding the limit does not block — it emits
9//! `buffer_pool_soft_limit_exceeded_total{tenant_id=…}` so operators
10//! can see which tenants are sustaining high multimodal throughput.
11//! The pool is global-per-process, not per-tenant, so a spike on
12//! tenant A doesn't force a second pool allocation for tenant B.
13//!
14//! The pool holds `Vec<u8>` slabs on its free lists. When a
15//! `ZeroCopyBuffer` allocated from the pool drops, the slab is
16//! reclaimed via the [`PoolHandle`] stored on its clone path.
17//! (11.b ships with a simpler model: slabs are requested on-demand
18//! and returned manually; a future revision wires the Drop impl to
19//! the pool once we're confident in the ownership model.)
20
21use std::collections::HashMap;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::{Arc, Mutex};
24
25// ── Size classes ─────────────────────────────────────────────────────
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum PoolClass {
29    Small,  // up to 4 KiB
30    Medium, // 4 KiB+..64 KiB
31    Large,  // 64 KiB+..1 MiB
32    Huge,   // 1 MiB+..10 MiB
33    Oversize, // > 10 MiB — direct allocation, no pooling
34}
35
36impl PoolClass {
37    pub fn capacity(self) -> usize {
38        match self {
39            PoolClass::Small => 4 * 1024,
40            PoolClass::Medium => 64 * 1024,
41            PoolClass::Large => 1024 * 1024,
42            PoolClass::Huge => 10 * 1024 * 1024,
43            PoolClass::Oversize => usize::MAX,
44        }
45    }
46
47    pub fn slug(self) -> &'static str {
48        match self {
49            PoolClass::Small => "small",
50            PoolClass::Medium => "medium",
51            PoolClass::Large => "large",
52            PoolClass::Huge => "huge",
53            PoolClass::Oversize => "oversize",
54        }
55    }
56
57    /// Pick the smallest class that fits `requested` bytes.
58    pub fn for_size(requested: usize) -> Self {
59        if requested <= PoolClass::Small.capacity() {
60            PoolClass::Small
61        } else if requested <= PoolClass::Medium.capacity() {
62            PoolClass::Medium
63        } else if requested <= PoolClass::Large.capacity() {
64            PoolClass::Large
65        } else if requested <= PoolClass::Huge.capacity() {
66            PoolClass::Huge
67        } else {
68            PoolClass::Oversize
69        }
70    }
71
72    pub fn non_oversize() -> [PoolClass; 4] {
73        [
74            PoolClass::Small,
75            PoolClass::Medium,
76            PoolClass::Large,
77            PoolClass::Huge,
78        ]
79    }
80}
81
82// ── Metrics ──────────────────────────────────────────────────────────
83
84#[derive(Debug, Default)]
85struct PoolMetrics {
86    pool_hits: [AtomicU64; 4],
87    pool_misses: [AtomicU64; 4],
88    oversize_allocations: AtomicU64,
89    live_bytes: AtomicU64,
90}
91
92impl PoolMetrics {
93    fn record_request(&self, class: PoolClass, hit: bool) {
94        if class == PoolClass::Oversize {
95            self.oversize_allocations.fetch_add(1, Ordering::Relaxed);
96            return;
97        }
98        let idx = match class {
99            PoolClass::Small => 0,
100            PoolClass::Medium => 1,
101            PoolClass::Large => 2,
102            PoolClass::Huge => 3,
103            PoolClass::Oversize => unreachable!(),
104        };
105        if hit {
106            self.pool_hits[idx].fetch_add(1, Ordering::Relaxed);
107        } else {
108            self.pool_misses[idx].fetch_add(1, Ordering::Relaxed);
109        }
110    }
111}
112
113/// Point-in-time snapshot for metric export. Each field maps 1:1 to
114/// a Prometheus counter surfaced by the adopter's observability layer.
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct BufferPoolSnapshot {
117    pub pool_hits: HashMap<PoolClass, u64>,
118    pub pool_misses: HashMap<PoolClass, u64>,
119    pub oversize_allocations_total: u64,
120    pub live_bytes: u64,
121    pub tenant_live_bytes: HashMap<String, u64>,
122    pub tenant_soft_limit_exceeded_total: HashMap<String, u64>,
123}
124
125// ── Tenant accounting ────────────────────────────────────────────────
126
127#[derive(Debug)]
128struct TenantAccount {
129    soft_limit_bytes: u64,
130    live_bytes: u64,
131    soft_limit_exceeded_total: u64,
132}
133
134impl TenantAccount {
135    fn new(soft_limit_bytes: u64) -> Self {
136        TenantAccount {
137            soft_limit_bytes,
138            live_bytes: 0,
139            soft_limit_exceeded_total: 0,
140        }
141    }
142}
143
144// ── BufferPool ───────────────────────────────────────────────────────
145
146/// Slab-allocating pool. Shared across tenants; tenant accounting
147/// happens at the metrics layer only. Thread-safe via `Mutex` on the
148/// per-class free lists; metrics are lock-free atomics.
149pub struct BufferPool {
150    /// Free lists per class. Each `Vec<u8>` in the free list has
151    /// capacity equal to the class's configured capacity.
152    free_lists: [Mutex<Vec<Vec<u8>>>; 4],
153    metrics: PoolMetrics,
154    tenants: Mutex<HashMap<Arc<str>, TenantAccount>>,
155    default_tenant_soft_limit_bytes: u64,
156}
157
158impl BufferPool {
159    /// Construct with the given per-tenant default soft limit (bytes).
160    pub fn new(default_tenant_soft_limit_bytes: u64) -> Self {
161        BufferPool {
162            free_lists: [
163                Mutex::new(Vec::new()),
164                Mutex::new(Vec::new()),
165                Mutex::new(Vec::new()),
166                Mutex::new(Vec::new()),
167            ],
168            metrics: PoolMetrics::default(),
169            tenants: Mutex::new(HashMap::new()),
170            default_tenant_soft_limit_bytes,
171        }
172    }
173
174    /// Configure a per-tenant override. Unknown tenants get the
175    /// default limit on their first allocation.
176    pub fn set_tenant_soft_limit(
177        &self,
178        tenant_id: impl Into<Arc<str>>,
179        soft_limit_bytes: u64,
180    ) {
181        let arc: Arc<str> = tenant_id.into();
182        let mut guard = self.tenants.lock().expect("tenant map poisoned");
183        guard
184            .entry(arc)
185            .or_insert_with(|| TenantAccount::new(soft_limit_bytes))
186            .soft_limit_bytes = soft_limit_bytes;
187    }
188
189    /// Acquire a slab of at least `requested` bytes. Returns the
190    /// allocated `Vec<u8>` and the class it came from. The `Vec` is
191    /// empty (len=0) but has the capacity of its class; callers
192    /// `extend_from_slice` into it.
193    pub fn acquire(&self, requested: usize) -> (Vec<u8>, PoolClass) {
194        let class = PoolClass::for_size(requested);
195        if class == PoolClass::Oversize {
196            self.metrics.record_request(class, false);
197            return (Vec::with_capacity(requested), class);
198        }
199        let idx = class_index(class);
200        let mut free = self.free_lists[idx]
201            .lock()
202            .expect("free list poisoned");
203        if let Some(mut slab) = free.pop() {
204            self.metrics.record_request(class, true);
205            slab.clear();
206            (slab, class)
207        } else {
208            self.metrics.record_request(class, false);
209            (Vec::with_capacity(class.capacity()), class)
210        }
211    }
212
213    /// Return a previously acquired slab to the pool. Callers typically
214    /// do this via the Drop impl of the wrapping buffer once all
215    /// views have dropped. In 11.b we expose the API explicitly; a
216    /// Drop-wired version lands in a follow-up revision.
217    pub fn release(&self, mut slab: Vec<u8>, class: PoolClass) {
218        if class == PoolClass::Oversize {
219            // Direct allocations aren't pooled — let them drop.
220            drop(slab);
221            return;
222        }
223        let idx = class_index(class);
224        slab.clear();
225        let mut free = self.free_lists[idx]
226            .lock()
227            .expect("free list poisoned");
228        // Cap the free list per class to avoid unbounded growth on
229        // idle workloads. 64 slabs per class is plenty for most
230        // steady-state ingest rates; excess slabs drop to the heap.
231        if free.len() < 64 {
232            free.push(slab);
233        } else {
234            drop(slab);
235        }
236    }
237
238    /// Record `bytes` of live buffer allocation against a tenant.
239    /// Emits the soft-limit-exceeded counter when appropriate.
240    pub fn record_tenant_allocation(
241        &self,
242        tenant_id: impl Into<Arc<str>>,
243        bytes: u64,
244    ) {
245        let arc: Arc<str> = tenant_id.into();
246        let default_limit = self.default_tenant_soft_limit_bytes;
247        let mut guard = self.tenants.lock().expect("tenant map poisoned");
248        let entry = guard
249            .entry(arc)
250            .or_insert_with(|| TenantAccount::new(default_limit));
251        entry.live_bytes = entry.live_bytes.saturating_add(bytes);
252        if entry.live_bytes > entry.soft_limit_bytes {
253            entry.soft_limit_exceeded_total += 1;
254        }
255        drop(guard);
256        self.metrics
257            .live_bytes
258            .fetch_add(bytes, Ordering::Relaxed);
259    }
260
261    /// Symmetric to [`record_tenant_allocation`]; called when a
262    /// buffer drops.
263    pub fn record_tenant_release(
264        &self,
265        tenant_id: impl Into<Arc<str>>,
266        bytes: u64,
267    ) {
268        let arc: Arc<str> = tenant_id.into();
269        let mut guard = self.tenants.lock().expect("tenant map poisoned");
270        if let Some(entry) = guard.get_mut(&arc) {
271            entry.live_bytes = entry.live_bytes.saturating_sub(bytes);
272        }
273        drop(guard);
274        self.metrics
275            .live_bytes
276            .fetch_sub(bytes.min(self.metrics.live_bytes.load(Ordering::Relaxed)), Ordering::Relaxed);
277    }
278
279    /// Snapshot for metric export / tests.
280    pub fn snapshot(&self) -> BufferPoolSnapshot {
281        let mut pool_hits = HashMap::new();
282        let mut pool_misses = HashMap::new();
283        for class in PoolClass::non_oversize() {
284            let idx = class_index(class);
285            pool_hits.insert(
286                class,
287                self.metrics.pool_hits[idx].load(Ordering::Relaxed),
288            );
289            pool_misses.insert(
290                class,
291                self.metrics.pool_misses[idx].load(Ordering::Relaxed),
292            );
293        }
294        let guard = self.tenants.lock().expect("tenant map poisoned");
295        let tenant_live_bytes: HashMap<String, u64> = guard
296            .iter()
297            .map(|(k, v)| (k.to_string(), v.live_bytes))
298            .collect();
299        let tenant_soft_limit_exceeded_total: HashMap<String, u64> = guard
300            .iter()
301            .map(|(k, v)| (k.to_string(), v.soft_limit_exceeded_total))
302            .collect();
303        BufferPoolSnapshot {
304            pool_hits,
305            pool_misses,
306            oversize_allocations_total: self
307                .metrics
308                .oversize_allocations
309                .load(Ordering::Relaxed),
310            live_bytes: self.metrics.live_bytes.load(Ordering::Relaxed),
311            tenant_live_bytes,
312            tenant_soft_limit_exceeded_total,
313        }
314    }
315}
316
317impl Default for BufferPool {
318    fn default() -> Self {
319        // Default per-tenant soft limit: 256 MiB. Matches the
320        // "reasonable single user" envelope; adopters override per
321        // plan / per tenant.
322        BufferPool::new(256 * 1024 * 1024)
323    }
324}
325
326fn class_index(class: PoolClass) -> usize {
327    match class {
328        PoolClass::Small => 0,
329        PoolClass::Medium => 1,
330        PoolClass::Large => 2,
331        PoolClass::Huge => 3,
332        PoolClass::Oversize => panic!("oversize class has no free list"),
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn size_classes_map_correctly() {
342        assert_eq!(PoolClass::for_size(1), PoolClass::Small);
343        assert_eq!(PoolClass::for_size(4 * 1024), PoolClass::Small);
344        assert_eq!(
345            PoolClass::for_size(4 * 1024 + 1),
346            PoolClass::Medium
347        );
348        assert_eq!(PoolClass::for_size(64 * 1024), PoolClass::Medium);
349        assert_eq!(
350            PoolClass::for_size(64 * 1024 + 1),
351            PoolClass::Large
352        );
353        assert_eq!(PoolClass::for_size(1024 * 1024), PoolClass::Large);
354        assert_eq!(
355            PoolClass::for_size(1024 * 1024 + 1),
356            PoolClass::Huge
357        );
358        assert_eq!(
359            PoolClass::for_size(10 * 1024 * 1024),
360            PoolClass::Huge
361        );
362        assert_eq!(
363            PoolClass::for_size(10 * 1024 * 1024 + 1),
364            PoolClass::Oversize
365        );
366    }
367
368    #[test]
369    fn acquire_returns_slab_with_class_capacity() {
370        let pool = BufferPool::default();
371        let (slab, class) = pool.acquire(8 * 1024);
372        assert_eq!(class, PoolClass::Medium);
373        assert_eq!(slab.len(), 0);
374        assert!(slab.capacity() >= PoolClass::Medium.capacity());
375    }
376
377    #[test]
378    fn release_reuses_slab() {
379        let pool = BufferPool::default();
380        let (mut slab, class) = pool.acquire(1000);
381        slab.extend_from_slice(&[42u8; 1000]);
382        pool.release(slab, class);
383
384        let (slab2, class2) = pool.acquire(1000);
385        assert_eq!(class, class2);
386        // Reused slab comes out cleared.
387        assert!(slab2.is_empty());
388        let snap = pool.snapshot();
389        assert_eq!(snap.pool_hits[&PoolClass::Small], 1);
390    }
391
392    #[test]
393    fn oversize_bypasses_pool() {
394        let pool = BufferPool::default();
395        let huge = 50 * 1024 * 1024;
396        let (slab, class) = pool.acquire(huge);
397        assert_eq!(class, PoolClass::Oversize);
398        assert!(slab.capacity() >= huge);
399        let snap = pool.snapshot();
400        assert_eq!(snap.oversize_allocations_total, 1);
401    }
402
403    #[test]
404    fn tenant_soft_limit_exceeded_increments() {
405        let pool = BufferPool::new(1024); // 1 KiB soft limit
406        pool.record_tenant_allocation("alpha", 2048); // over
407        pool.record_tenant_allocation("alpha", 512);  // still over
408        let snap = pool.snapshot();
409        assert_eq!(
410            snap.tenant_soft_limit_exceeded_total["alpha"],
411            2
412        );
413        assert_eq!(snap.tenant_live_bytes["alpha"], 2560);
414    }
415
416    #[test]
417    fn per_tenant_override_applies() {
418        let pool = BufferPool::new(1024);
419        pool.set_tenant_soft_limit("premium", 10 * 1024);
420        pool.record_tenant_allocation("premium", 5 * 1024);
421        let snap = pool.snapshot();
422        // 5 KiB under 10 KiB override → no exceed.
423        assert_eq!(
424            snap.tenant_soft_limit_exceeded_total["premium"],
425            0
426        );
427    }
428
429    #[test]
430    fn free_list_caps_at_64() {
431        let pool = BufferPool::default();
432        for _ in 0..100 {
433            let (slab, class) = pool.acquire(1000);
434            pool.release(slab, class);
435        }
436        // No direct assertion on internal state — just checking we
437        // don't blow up. Next acquire should still work.
438        let (_slab, class) = pool.acquire(1000);
439        assert_eq!(class, PoolClass::Small);
440    }
441}