use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::{Mutex, RwLock};
use solo_core::{Embedder, Error, Result, TenantId};
use crate::key_material::KeyMaterial;
use crate::tenants::{TenantHandle, TenantOpenParams, TenantRecord, TenantsIndex};
use crate::vector_index::HnswParams;
struct RegistryDeps {
data_dir: PathBuf,
key: KeyMaterial,
embedder: Arc<dyn Embedder>,
hnsw_params: HnswParams,
steward: Option<Arc<solo_steward::Steward>>,
runtime_handle: Option<TokioHandle>,
steward_factory: Option<Arc<dyn crate::steward_factory::StewardFactory>>,
triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
}
pub struct TenantRegistry {
index: Mutex<TenantsIndex>,
handles: RwLock<HashMap<TenantId, Arc<TenantHandle>>>,
open_locks: Mutex<HashMap<TenantId, Arc<Mutex<()>>>>,
deps: RegistryDeps,
}
pub struct TenantRegistryParams {
pub data_dir: PathBuf,
pub key: KeyMaterial,
pub embedder: Arc<dyn Embedder>,
pub hnsw_params: HnswParams,
pub steward: Option<Arc<solo_steward::Steward>>,
pub runtime_handle: Option<TokioHandle>,
#[allow(clippy::type_complexity)]
pub steward_factory:
Option<Arc<dyn crate::steward_factory::StewardFactory>>,
pub triples_batch_signal:
Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
}
impl TenantRegistry {
pub fn open(params: TenantRegistryParams) -> Result<Self> {
let TenantRegistryParams {
data_dir,
key,
embedder,
hnsw_params,
steward,
runtime_handle,
steward_factory,
triples_batch_signal,
} = params;
let tenants_index_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
let legacy_db_path = data_dir.join("solo.db");
if !tenants_index_path.is_file() && legacy_db_path.is_file() {
tracing::info!(
data_dir = %data_dir.display(),
"v0.7.1 single-DB layout detected; running v0.7.1 → v0.8.0 mass-data-move"
);
crate::tenants::migrate_v071_to_v080(&data_dir, &key)?;
}
let index = TenantsIndex::open(&data_dir, &key)?;
Ok(Self {
index: Mutex::new(index),
handles: RwLock::new(HashMap::new()),
open_locks: Mutex::new(HashMap::new()),
deps: RegistryDeps {
data_dir,
key,
embedder,
hnsw_params,
steward,
runtime_handle,
steward_factory,
triples_batch_signal,
},
})
}
pub fn data_dir(&self) -> &Path {
&self.deps.data_dir
}
pub async fn get_or_open(
&self,
tenant_id: &TenantId,
) -> Result<Arc<TenantHandle>> {
{
let map = self.handles.read().await;
if let Some(h) = map.get(tenant_id) {
let handle = h.clone();
drop(map);
self.touch_last_accessed_best_effort(tenant_id).await;
return Ok(handle);
}
}
let open_lock: Arc<Mutex<()>> = {
let mut locks = self.open_locks.lock().await;
locks
.entry(tenant_id.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
};
let _open_guard = open_lock.lock().await;
{
let map = self.handles.read().await;
if let Some(h) = map.get(tenant_id) {
let handle = h.clone();
drop(map);
drop(_open_guard);
self.touch_last_accessed_best_effort(tenant_id).await;
return Ok(handle);
}
}
let (db_filename, quota_bytes) = {
let index = self.index.lock().await;
let rec = index.lookup(tenant_id)?.ok_or_else(|| {
Error::not_found(format!(
"tenant `{tenant_id}` not found in tenants_index"
))
})?;
if rec.status != crate::tenants::TenantStatus::Active {
return Err(Error::conflict(format!(
"tenant `{tenant_id}` has status `{}`; refusing to open",
rec.status.as_sql_str()
)));
}
(rec.db_filename, rec.quota_bytes)
};
let tenant_id_clone = tenant_id.clone();
let open_params = TenantOpenParams {
data_dir: self.deps.data_dir.clone(),
key: self.deps.key.clone(),
db_filename,
embedder: self.deps.embedder.clone(),
hnsw_params: self.deps.hnsw_params.clone(),
steward: self.deps.steward.clone(),
runtime_handle: self.deps.runtime_handle.clone(),
quota_bytes,
steward_factory: self.deps.steward_factory.clone(),
triples_batch_signal: self.deps.triples_batch_signal.clone(),
};
let handle = tokio::task::spawn_blocking(move || {
TenantHandle::open(tenant_id_clone, open_params)
})
.await
.map_err(|e| Error::storage(format!("join spawn_blocking for TenantHandle::open: {e}")))??;
let arc = Arc::new(handle);
{
let mut map = self.handles.write().await;
map.insert(tenant_id.clone(), arc.clone());
}
drop(_open_guard);
self.touch_last_accessed_best_effort(tenant_id).await;
Ok(arc)
}
async fn touch_last_accessed_best_effort(&self, tenant_id: &TenantId) {
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let mut index = self.index.lock().await;
if let Err(e) = index.touch_last_accessed(tenant_id, now_ms) {
tracing::warn!(
tenant = %tenant_id,
error = %e,
"touch_last_accessed: registry stamp failed; tenant open \
succeeded but the last_accessed column will not reflect \
this open until the next successful call"
);
}
}
pub async fn forget_handle(
&self,
tenant_id: &TenantId,
) -> Option<Arc<TenantHandle>> {
let mut map = self.handles.write().await;
map.remove(tenant_id)
}
pub async fn list_active(&self) -> Result<Vec<TenantRecord>> {
let index = self.index.lock().await;
index.list()
}
pub async fn hydrate_tenant_cost_numbers(
&self,
records: &[TenantRecord],
cap: usize,
) -> Vec<TenantCostNumbers> {
let data_dir = self.deps.data_dir.clone();
let key = self.deps.key.clone();
let snapshot: Vec<(usize, String)> = records
.iter()
.enumerate()
.map(|(i, r)| (i, r.db_filename.clone()))
.collect();
let total = snapshot.len();
tokio::task::spawn_blocking(move || {
let mut out = Vec::with_capacity(total);
for (i, db_filename) in snapshot {
let db_path = data_dir
.join(crate::tenants::TENANTS_SUBDIR)
.join(&db_filename);
let size_bytes = match std::fs::metadata(&db_path) {
Ok(meta) => Some(meta.len()),
Err(_) => None,
};
let episode_count = if i < cap && db_path.is_file() {
hydrate_episode_count(&db_path, &key)
} else {
None
};
out.push(TenantCostNumbers {
size_bytes,
episode_count,
});
}
out
})
.await
.unwrap_or_else(|join_err| {
tracing::warn!(
error = %join_err,
"hydrate_tenant_cost_numbers: spawn_blocking join failed; returning empty"
);
Vec::new()
})
}
pub async fn with_index<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut TenantsIndex) -> R,
{
let mut index = self.index.lock().await;
f(&mut *index)
}
pub async fn shutdown_all(&self) {
let handles: Vec<(TenantId, Arc<TenantHandle>)> = {
let mut map = self.handles.write().await;
map.drain().collect()
};
for (tenant_id, handle) in handles {
match Arc::try_unwrap(handle) {
Ok(owned) => {
if let Err(e) = owned.shutdown(true).await {
tracing::warn!(
tenant = %tenant_id,
error = %e,
"tenant shutdown returned error"
);
}
}
Err(_still_shared) => {
tracing::warn!(
tenant = %tenant_id,
"tenant handle still has outstanding Arc clones at \
shutdown_all; skipping. The next process restart \
will reload from snapshot + replay pending_index."
);
}
}
}
}
pub async fn is_open(&self, tenant_id: &TenantId) -> bool {
let map = self.handles.read().await;
map.contains_key(tenant_id)
}
#[cfg(any(test, feature = "test-support"))]
pub fn for_tests_with_single_tenant(
data_dir: std::path::PathBuf,
key: KeyMaterial,
embedder: Arc<dyn solo_core::Embedder>,
handle: Arc<TenantHandle>,
) -> Self {
let tenant_id = handle.tenant_id().clone();
let mut handles_map = HashMap::new();
handles_map.insert(tenant_id, handle);
let conn = rusqlite::Connection::open_in_memory()
.expect("open in-memory tenants_index stub for tests");
let mut conn = conn;
crate::migration::run_tenants_index_migrations(&mut conn)
.expect("apply tenants_index migrations to in-memory test stub");
let index = TenantsIndex::from_connection_for_tests(conn);
Self {
index: Mutex::new(index),
handles: RwLock::new(handles_map),
open_locks: Mutex::new(HashMap::new()),
deps: RegistryDeps {
data_dir,
key,
embedder,
hnsw_params: crate::vector_index::HnswParams::default(),
steward: None,
runtime_handle: None,
steward_factory: None,
triples_batch_signal: None,
},
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TenantCostNumbers {
pub size_bytes: Option<u64>,
pub episode_count: Option<i64>,
}
fn hydrate_episode_count(
db_path: &std::path::Path,
key: &KeyMaterial,
) -> Option<i64> {
let count = match crate::init::open_sqlcipher(db_path, key) {
Ok(conn) => count_active_episodes(&conn),
Err(_) => {
match rusqlite::Connection::open(db_path) {
Ok(conn) => count_active_episodes(&conn),
Err(e) => {
tracing::warn!(
db = %db_path.display(),
error = %e,
"hydrate_episode_count: open failed (both SQLCipher and plain)"
);
return None;
}
}
}
};
match count {
Ok(n) => Some(n),
Err(e) => {
tracing::warn!(
db = %db_path.display(),
error = %e,
"hydrate_episode_count: COUNT(*) failed"
);
None
}
}
}
fn count_active_episodes(conn: &rusqlite::Connection) -> Result<i64> {
conn.query_row(
"SELECT COUNT(*) FROM episodes WHERE status = 'active'",
[],
|r| r.get::<_, i64>(0),
)
.map_err(|e| Error::storage(format!("COUNT(*) FROM episodes: {e}")))
}