use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::{Mutex, RwLock};
use solo_core::{Embedder, Error, Result, TenantId};
use crate::key_material::KeyMaterial;
use crate::tenants::{TenantHandle, TenantOpenParams, TenantRecord, TenantsIndex};
use crate::vector_index::HnswParams;
struct RegistryDeps {
data_dir: PathBuf,
key: KeyMaterial,
embedder: Arc<dyn Embedder>,
hnsw_params: HnswParams,
steward: Option<Arc<solo_steward::Steward>>,
runtime_handle: Option<TokioHandle>,
}
pub struct TenantRegistry {
index: Mutex<TenantsIndex>,
handles: RwLock<HashMap<TenantId, Arc<TenantHandle>>>,
deps: RegistryDeps,
}
pub struct TenantRegistryParams {
pub data_dir: PathBuf,
pub key: KeyMaterial,
pub embedder: Arc<dyn Embedder>,
pub hnsw_params: HnswParams,
pub steward: Option<Arc<solo_steward::Steward>>,
pub runtime_handle: Option<TokioHandle>,
}
impl TenantRegistry {
pub fn open(params: TenantRegistryParams) -> Result<Self> {
let TenantRegistryParams {
data_dir,
key,
embedder,
hnsw_params,
steward,
runtime_handle,
} = params;
let tenants_index_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
let legacy_db_path = data_dir.join("solo.db");
if !tenants_index_path.is_file() && legacy_db_path.is_file() {
tracing::info!(
data_dir = %data_dir.display(),
"v0.7.1 single-DB layout detected; running v0.7.1 → v0.8.0 mass-data-move"
);
crate::tenants::migrate_v071_to_v080(&data_dir, &key)?;
}
let index = TenantsIndex::open(&data_dir, &key)?;
Ok(Self {
index: Mutex::new(index),
handles: RwLock::new(HashMap::new()),
deps: RegistryDeps {
data_dir,
key,
embedder,
hnsw_params,
steward,
runtime_handle,
},
})
}
pub fn data_dir(&self) -> &Path {
&self.deps.data_dir
}
pub async fn get_or_open(
&self,
tenant_id: &TenantId,
) -> Result<Arc<TenantHandle>> {
{
let map = self.handles.read().await;
if let Some(h) = map.get(tenant_id) {
return Ok(h.clone());
}
}
let mut map = self.handles.write().await;
if let Some(h) = map.get(tenant_id) {
return Ok(h.clone());
}
let (db_filename, quota_bytes) = {
let index = self.index.lock().await;
let rec = index.lookup(tenant_id)?.ok_or_else(|| {
Error::not_found(format!(
"tenant `{tenant_id}` not found in tenants_index"
))
})?;
if rec.status != crate::tenants::TenantStatus::Active {
return Err(Error::conflict(format!(
"tenant `{tenant_id}` has status `{}`; refusing to open",
rec.status.as_sql_str()
)));
}
(rec.db_filename, rec.quota_bytes)
};
let tenant_id_clone = tenant_id.clone();
let open_params = TenantOpenParams {
data_dir: self.deps.data_dir.clone(),
key: self.deps.key.clone(),
db_filename,
embedder: self.deps.embedder.clone(),
hnsw_params: self.deps.hnsw_params.clone(),
steward: self.deps.steward.clone(),
runtime_handle: self.deps.runtime_handle.clone(),
quota_bytes,
};
let handle = tokio::task::spawn_blocking(move || {
TenantHandle::open(tenant_id_clone, open_params)
})
.await
.map_err(|e| Error::storage(format!("join spawn_blocking for TenantHandle::open: {e}")))??;
let arc = Arc::new(handle);
map.insert(tenant_id.clone(), arc.clone());
Ok(arc)
}
pub async fn forget_handle(
&self,
tenant_id: &TenantId,
) -> Option<Arc<TenantHandle>> {
let mut map = self.handles.write().await;
map.remove(tenant_id)
}
pub async fn list_active(&self) -> Result<Vec<TenantRecord>> {
let index = self.index.lock().await;
index.list()
}
pub async fn with_index<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut TenantsIndex) -> R,
{
let mut index = self.index.lock().await;
f(&mut *index)
}
pub async fn shutdown_all(&self) {
let handles: Vec<(TenantId, Arc<TenantHandle>)> = {
let mut map = self.handles.write().await;
map.drain().collect()
};
for (tenant_id, handle) in handles {
match Arc::try_unwrap(handle) {
Ok(owned) => {
if let Err(e) = owned.shutdown(true).await {
tracing::warn!(
tenant = %tenant_id,
error = %e,
"tenant shutdown returned error"
);
}
}
Err(_still_shared) => {
tracing::warn!(
tenant = %tenant_id,
"tenant handle still has outstanding Arc clones at \
shutdown_all; skipping. The next process restart \
will reload from snapshot + replay pending_index."
);
}
}
}
}
pub async fn is_open(&self, tenant_id: &TenantId) -> bool {
let map = self.handles.read().await;
map.contains_key(tenant_id)
}
#[cfg(any(test, feature = "test-support"))]
pub fn for_tests_with_single_tenant(
data_dir: std::path::PathBuf,
key: KeyMaterial,
embedder: Arc<dyn solo_core::Embedder>,
handle: Arc<TenantHandle>,
) -> Self {
let tenant_id = handle.tenant_id().clone();
let mut handles_map = HashMap::new();
handles_map.insert(tenant_id, handle);
let conn = rusqlite::Connection::open_in_memory()
.expect("open in-memory tenants_index stub for tests");
let mut conn = conn;
crate::migration::run_tenants_index_migrations(&mut conn)
.expect("apply tenants_index migrations to in-memory test stub");
let index = TenantsIndex::from_connection_for_tests(conn);
Self {
index: Mutex::new(index),
handles: RwLock::new(handles_map),
deps: RegistryDeps {
data_dir,
key,
embedder,
hnsw_params: crate::vector_index::HnswParams::default(),
steward: None,
runtime_handle: None,
},
}
}
}