use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::error::{MemError, Result};
#[derive(Debug, Clone)]
pub struct CollectionArenaHandle {
arena_index: Option<u32>,
tag: String,
}
impl CollectionArenaHandle {
pub fn arena_index(&self) -> Option<u32> {
self.arena_index
}
pub fn tag(&self) -> &str {
&self.tag
}
pub fn resident_bytes(&self) -> Option<u64> {
let idx = self.arena_index?;
read_arena_resident(idx).ok().map(|v| v as u64)
}
}
#[derive(Debug, Default)]
pub struct CollectionArenaRegistry {
inner: Mutex<RegistryInner>,
}
#[derive(Debug, Default)]
struct RegistryInner {
handles: HashMap<(u64, String), CollectionArenaHandle>,
}
impl CollectionArenaRegistry {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn get_or_create(&self, tenant_id: u64, collection: &str) -> Result<CollectionArenaHandle> {
let key = (tenant_id, collection.to_string());
let mut guard = self
.inner
.lock()
.map_err(|_| MemError::Jemalloc("collection arena registry lock poisoned".into()))?;
if let Some(h) = guard.handles.get(&key) {
return Ok(h.clone());
}
let handle = allocate_handle(tenant_id, collection);
guard.handles.insert(key, handle.clone());
Ok(handle)
}
pub fn get(&self, tenant_id: u64, collection: &str) -> Option<CollectionArenaHandle> {
let guard = self.inner.lock().ok()?;
guard
.handles
.get(&(tenant_id, collection.to_string()))
.cloned()
}
}
fn allocate_handle(tenant_id: u64, collection: &str) -> CollectionArenaHandle {
let tag = format!("t{tenant_id}/{collection}");
let arena_index = create_arena()
.map_err(|e| {
tracing::debug!(tag, error = %e, "per-collection arena creation failed; using global allocator");
})
.ok();
CollectionArenaHandle { arena_index, tag }
}
fn create_arena() -> Result<u32> {
let arena_idx: u32 = unsafe { tikv_jemalloc_ctl::raw::read(b"arenas.create\0") }
.map_err(|e| MemError::Jemalloc(format!("failed to create collection arena: {e:?}")))?;
Ok(arena_idx)
}
fn read_arena_resident(arena_index: u32) -> Result<usize> {
if let Ok(mib) = tikv_jemalloc_ctl::epoch::mib() {
let _ = mib.advance();
}
let key = format!("stats.arenas.{arena_index}.resident\0");
unsafe { tikv_jemalloc_ctl::raw::read::<usize>(key.as_bytes()) }
.map_err(|e| MemError::Jemalloc(format!("failed to read arena resident: {e:?}")))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn registry_create_and_retrieve() {
let reg = CollectionArenaRegistry::new();
let h1 = reg.get_or_create(1, "embeddings").expect("create");
let h2 = reg.get_or_create(1, "embeddings").expect("re-fetch");
assert_eq!(h1.arena_index(), h2.arena_index(), "same key → same arena");
assert_eq!(h1.tag(), "t1/embeddings");
}
#[test]
fn different_collections_get_different_arenas() {
let reg = CollectionArenaRegistry::new();
let h1 = reg.get_or_create(1, "col_a").expect("create a");
let h2 = reg.get_or_create(1, "col_b").expect("create b");
if h1.arena_index().is_some() && h2.arena_index().is_some() {
assert_ne!(
h1.arena_index(),
h2.arena_index(),
"distinct collections must get distinct arenas"
);
}
}
#[test]
fn different_tenants_get_different_arenas() {
let reg = CollectionArenaRegistry::new();
let h1 = reg.get_or_create(1, "embeddings").expect("t1");
let h2 = reg.get_or_create(2, "embeddings").expect("t2");
if h1.arena_index().is_some() && h2.arena_index().is_some() {
assert_ne!(
h1.arena_index(),
h2.arena_index(),
"different tenants must get distinct arenas"
);
}
}
#[test]
fn get_returns_none_for_unknown_collection() {
let reg = CollectionArenaRegistry::new();
assert!(reg.get(99, "unknown").is_none());
}
#[test]
fn resident_bytes_does_not_panic() {
let reg = CollectionArenaRegistry::new();
let h = reg.get_or_create(1, "test_resident").expect("create");
let _ = h.resident_bytes();
}
}