Skip to main content

nodedb_mem/
collection_arena.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-collection jemalloc arena registry.
4//!
5//! Each vector-primary collection receives a dedicated jemalloc arena so
6//! HNSW + segment allocations are isolated from document-engine workloads.
7//! On targets where jemalloc arena creation fails (e.g., Miri or custom
8//! allocators), `arena_index` is `None` and the global allocator is used
9//! transparently — callers need not special-case this.
10
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14use crate::error::{MemError, Result};
15
16/// Opaque handle to a per-collection jemalloc arena.
17///
18/// Dropping the handle releases the registry entry. The underlying arena
19/// index persists (jemalloc arenas are permanent once created).
20#[derive(Debug, Clone)]
21pub struct CollectionArenaHandle {
22    /// The jemalloc arena index, or `None` when arena creation failed or
23    /// the target does not support per-arena stats.
24    arena_index: Option<u32>,
25    /// Human-readable tag for diagnostics (`t{tenant_id}/{collection}`).
26    tag: String,
27}
28
29impl CollectionArenaHandle {
30    /// Returns the jemalloc arena index, if one was allocated.
31    pub fn arena_index(&self) -> Option<u32> {
32        self.arena_index
33    }
34
35    /// Returns the diagnostic tag.
36    pub fn tag(&self) -> &str {
37        &self.tag
38    }
39
40    /// Query the resident memory (bytes) in this arena.
41    ///
42    /// Returns `None` when no dedicated arena exists or the query fails.
43    pub fn resident_bytes(&self) -> Option<u64> {
44        let idx = self.arena_index?;
45        read_arena_resident(idx).ok().map(|v| v as u64)
46    }
47}
48
49/// Registry of per-collection jemalloc arenas.
50///
51/// Wrap in `Arc` to share between the Data Plane (writes) and the Control
52/// Plane (stats reads). Internal locking is handled by the registry itself.
53#[derive(Debug, Default)]
54pub struct CollectionArenaRegistry {
55    inner: Mutex<RegistryInner>,
56}
57
58#[derive(Debug, Default)]
59struct RegistryInner {
60    handles: HashMap<(u64, String), CollectionArenaHandle>,
61}
62
63impl CollectionArenaRegistry {
64    /// Create a new empty registry wrapped in `Arc`.
65    pub fn new() -> Arc<Self> {
66        Arc::new(Self::default())
67    }
68
69    /// Return (or create) a dedicated arena for `(tenant_id, collection)`.
70    ///
71    /// Idempotent: calling twice with the same key returns the same handle.
72    pub fn get_or_create(&self, tenant_id: u64, collection: &str) -> Result<CollectionArenaHandle> {
73        let key = (tenant_id, collection.to_string());
74        let mut guard = self
75            .inner
76            .lock()
77            .map_err(|_| MemError::Jemalloc("collection arena registry lock poisoned".into()))?;
78        if let Some(h) = guard.handles.get(&key) {
79            return Ok(h.clone());
80        }
81        let handle = allocate_handle(tenant_id, collection);
82        guard.handles.insert(key, handle.clone());
83        Ok(handle)
84    }
85
86    /// Look up an existing handle without creating one. Returns `None` when
87    /// the collection has no dedicated arena yet.
88    pub fn get(&self, tenant_id: u64, collection: &str) -> Option<CollectionArenaHandle> {
89        let guard = self.inner.lock().ok()?;
90        guard
91            .handles
92            .get(&(tenant_id, collection.to_string()))
93            .cloned()
94    }
95}
96
97/// Build a handle, creating a new jemalloc arena when possible.
98///
99/// Failures (e.g., Miri or custom allocator) are silently downgraded to a
100/// no-dedicated-arena handle rather than propagating an error, because arena
101/// isolation is an optimisation, not a correctness requirement.
102fn allocate_handle(tenant_id: u64, collection: &str) -> CollectionArenaHandle {
103    let tag = format!("t{tenant_id}/{collection}");
104    let arena_index = create_arena()
105        .map_err(|e| {
106            tracing::debug!(tag, error = %e, "per-collection arena creation failed; using global allocator");
107        })
108        .ok();
109    CollectionArenaHandle { arena_index, tag }
110}
111
112/// Create a new jemalloc arena and return its index.
113fn create_arena() -> Result<u32> {
114    // SAFETY: `arenas.create` is a standard jemalloc mallctl that creates a
115    // new arena and returns its unsigned index. No pointers are involved.
116    let arena_idx: u32 = unsafe { tikv_jemalloc_ctl::raw::read(b"arenas.create\0") }
117        .map_err(|e| MemError::Jemalloc(format!("failed to create collection arena: {e:?}")))?;
118    Ok(arena_idx)
119}
120
121/// Query the resident memory (bytes) of a jemalloc arena.
122fn read_arena_resident(arena_index: u32) -> Result<usize> {
123    // Bump the epoch so stats are up-to-date.
124    if let Ok(mib) = tikv_jemalloc_ctl::epoch::mib() {
125        let _ = mib.advance();
126    }
127    // `stats.arenas.<n>.resident` requires `--enable-stats` at jemalloc
128    // compile time. Return an error (callers treat it as `None`) when not
129    // available.
130    let key = format!("stats.arenas.{arena_index}.resident\0");
131    unsafe { tikv_jemalloc_ctl::raw::read::<usize>(key.as_bytes()) }
132        .map_err(|e| MemError::Jemalloc(format!("failed to read arena resident: {e:?}")))
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn registry_create_and_retrieve() {
141        let reg = CollectionArenaRegistry::new();
142        let h1 = reg.get_or_create(1, "embeddings").expect("create");
143        let h2 = reg.get_or_create(1, "embeddings").expect("re-fetch");
144        // Arena index must be the same handle.
145        assert_eq!(h1.arena_index(), h2.arena_index(), "same key → same arena");
146        assert_eq!(h1.tag(), "t1/embeddings");
147    }
148
149    #[test]
150    fn different_collections_get_different_arenas() {
151        let reg = CollectionArenaRegistry::new();
152        let h1 = reg.get_or_create(1, "col_a").expect("create a");
153        let h2 = reg.get_or_create(1, "col_b").expect("create b");
154        if h1.arena_index().is_some() && h2.arena_index().is_some() {
155            assert_ne!(
156                h1.arena_index(),
157                h2.arena_index(),
158                "distinct collections must get distinct arenas"
159            );
160        }
161    }
162
163    #[test]
164    fn different_tenants_get_different_arenas() {
165        let reg = CollectionArenaRegistry::new();
166        let h1 = reg.get_or_create(1, "embeddings").expect("t1");
167        let h2 = reg.get_or_create(2, "embeddings").expect("t2");
168        if h1.arena_index().is_some() && h2.arena_index().is_some() {
169            assert_ne!(
170                h1.arena_index(),
171                h2.arena_index(),
172                "different tenants must get distinct arenas"
173            );
174        }
175    }
176
177    #[test]
178    fn get_returns_none_for_unknown_collection() {
179        let reg = CollectionArenaRegistry::new();
180        assert!(reg.get(99, "unknown").is_none());
181    }
182
183    #[test]
184    fn resident_bytes_does_not_panic() {
185        let reg = CollectionArenaRegistry::new();
186        let h = reg.get_or_create(1, "test_resident").expect("create");
187        // Must not panic; may return None when stats are unavailable.
188        let _ = h.resident_bytes();
189    }
190}