use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::Mutex;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::RwLock as AsyncRwLock;
use rusqlite::Connection;
use solo_core::{Embedder, Error, Result, TenantId, VectorIndex, VectorIndexFactory};
use crate::steward_factory::StewardFactory;
use crate::audit::{AuditWriter, AuditWriterShutdown, purge_older_than};
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::hnsw_id::episode_hnsw_id;
use crate::init::open_sqlcipher;
use crate::key_material::KeyMaterial;
use crate::migration;
use crate::reader::ReaderPool;
use crate::recovery::{
DriftReport, RebuildReport, ReplayReport, detect_drift, rebuild_hnsw_from_sql,
replay_pending_index,
};
use crate::snapshot::{self, BAK_BASENAME, LIVE_BASENAME, TMP_BASENAME};
use crate::tenants::{TENANTS_SUBDIR, TenantsIndex};
use crate::vector_index::{HnswFactory, HnswIndex, HnswParams};
use crate::writer::{WriteHandle, WriterActor, WriterSpawn};
const HNSW_DATA_SUFFIX: &str = ".hnsw.data";
const HNSW_GRAPH_SUFFIX: &str = ".hnsw.graph";
pub struct TenantHandle {
tenant_id: TenantId,
config: crate::config::SoloConfig,
db_path: PathBuf,
snapshot_dir: PathBuf,
embedder_id: i64,
hnsw: Arc<dyn VectorIndex + Send + Sync>,
embedder: Arc<dyn Embedder>,
write: WriteHandle,
writer_join: Option<std::thread::JoinHandle<()>>,
read: ReaderPool,
audit: AuditWriter,
audit_shutdown: Mutex<Option<AuditWriterShutdown>>,
audit_sweep_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
replay: ReplayReport,
drift: DriftReport,
used_bak_snapshot: bool,
started_fresh: bool,
rebuild: RebuildReport,
steward_slot: Arc<AsyncRwLock<Option<Arc<solo_steward::Steward>>>>,
}
fn per_tenant_snapshot_dir(data_dir: &Path, tenant_id: &TenantId) -> PathBuf {
data_dir.join(TENANTS_SUBDIR).join(tenant_id.as_str())
}
fn per_tenant_db_path(data_dir: &Path, db_filename: &str) -> PathBuf {
data_dir.join(TENANTS_SUBDIR).join(db_filename)
}
fn upgrade_flat_default_snapshots_to_subdir(
data_dir: &Path,
tenant_id: &TenantId,
) -> Result<()> {
if tenant_id.as_str() != "default" {
return Ok(());
}
let flat_dir = data_dir.join(TENANTS_SUBDIR);
let subdir = flat_dir.join(tenant_id.as_str());
std::fs::create_dir_all(&subdir).map_err(|e| {
Error::storage(format!(
"create per-tenant snapshot subdir {}: {e}",
subdir.display()
))
})?;
for basename in [LIVE_BASENAME, BAK_BASENAME, TMP_BASENAME] {
for suffix in [HNSW_DATA_SUFFIX, HNSW_GRAPH_SUFFIX] {
let filename = format!("{basename}{suffix}");
let src = flat_dir.join(&filename);
let dst = subdir.join(&filename);
if !src.is_file() {
continue;
}
if dst.is_file() {
std::fs::remove_file(&src).map_err(|e| {
Error::storage(format!(
"remove flat-layout snapshot duplicate {}: {e}",
src.display()
))
})?;
continue;
}
std::fs::rename(&src, &dst).map_err(|e| {
Error::storage(format!(
"promote flat-layout snapshot {} → {}: {e}",
src.display(),
dst.display()
))
})?;
tracing::info!(
src = %src.display(),
dst = %dst.display(),
tenant = %tenant_id,
"P2: promoted flat-tenants/ HNSW snapshot into per-tenant subdir"
);
}
}
Ok(())
}
pub struct TenantOpenParams {
pub data_dir: PathBuf,
pub key: KeyMaterial,
pub db_filename: String,
pub embedder: Arc<dyn Embedder>,
pub hnsw_params: HnswParams,
pub steward: Option<Arc<solo_steward::Steward>>,
pub runtime_handle: Option<TokioHandle>,
#[allow(clippy::field_reassign_with_default)]
pub quota_bytes: Option<u64>,
pub steward_factory: Option<Arc<dyn StewardFactory>>,
pub triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
}
impl TenantHandle {
pub fn open(tenant_id: TenantId, params: TenantOpenParams) -> Result<Self> {
let TenantOpenParams {
data_dir,
key,
db_filename,
embedder,
hnsw_params,
steward,
runtime_handle,
quota_bytes,
steward_factory,
triples_batch_signal,
} = params;
let config_path = data_dir.join("solo.config.toml");
let config = crate::config::SoloConfig::read(&config_path)?;
let dim = config.embedder.dim as usize;
if dim == 0 {
return Err(Error::storage(format!(
"solo.config.toml records embedder.dim=0 — corrupt config? at {config_path:?}"
)));
}
upgrade_flat_default_snapshots_to_subdir(&data_dir, &tenant_id)?;
let db_path = per_tenant_db_path(&data_dir, &db_filename);
let snapshot_dir = per_tenant_snapshot_dir(&data_dir, &tenant_id);
std::fs::create_dir_all(&snapshot_dir).map_err(|e| {
Error::storage(format!(
"create per-tenant snapshot dir {}: {e}",
snapshot_dir.display()
))
})?;
if !db_path.is_file() {
return Err(Error::not_found(format!(
"per-tenant DB not found at {}; the tenants_index row \
references this file but it is missing. Operator action \
required (restore from backup or remove the orphan registry row).",
db_path.display()
)));
}
let mut conn: Connection = open_sqlcipher(&db_path, &key)?;
let _schema_version = migration::run_migrations(&mut conn)?;
let embedder_identity = EmbedderIdentity {
name: config.embedder.name.clone(),
version: config.embedder.version.clone(),
dim: config.embedder.dim,
dtype: config.embedder.dtype.clone(),
};
let embedder_id = get_or_insert_embedder_id(&conn, &embedder_identity)?;
let factory = HnswFactory::with_params(hnsw_params);
let (hnsw_index, used_bak_snapshot, started_fresh) =
load_hnsw_with_fallback(&snapshot_dir, &factory, dim);
if !started_fresh && hnsw_index.dim() != dim {
return Err(Error::storage(format!(
"tenant {tenant_id}: HNSW snapshot dim ({}) does not match \
solo.config.toml embedder.dim ({dim}). Embedder identity has \
shifted under the daemon. Run `solo reembed` to rebuild.",
hnsw_index.dim()
)));
}
let rebuild = if started_fresh {
let started = std::time::Instant::now();
let r = rebuild_hnsw_from_sql(&conn, &hnsw_index, embedder_id)?;
if r.rows_seen > 0 {
tracing::info!(
tenant = %tenant_id,
rows_seen = r.rows_seen,
rows_added = r.rows_added,
rows_skipped = r.rows_skipped,
elapsed_ms = started.elapsed().as_millis() as u64,
"tenant: rebuilt HNSW from embeddings after empty-snapshot fallback"
);
}
r
} else {
RebuildReport::default()
};
let hnsw: Arc<dyn VectorIndex + Send + Sync> = Arc::new(hnsw_index);
let forgotten = if started_fresh {
0
} else {
rebuild_tombstones_from_sql(&conn, hnsw.as_ref())?
};
if forgotten > 0 {
tracing::info!(
tenant = %tenant_id,
forgotten,
"tenant: rebuilt HNSW tombstones from forgotten episodes"
);
}
let replay = replay_pending_index(&mut conn, hnsw.as_ref())?;
let drift = detect_drift(&conn, hnsw.as_ref())?;
drop(conn);
let pool = ReaderPool::new(&db_path, Some(key.clone()), hnsw.clone())?;
let redactor = Arc::new(crate::redaction::RedactionRegistry::from_config(
&config.redaction,
)?);
let initial_slot_value: Option<Arc<solo_steward::Steward>> =
if let Some(factory) = steward_factory.as_ref() {
factory.build()?
} else {
steward.clone()
};
let steward_slot = Arc::new(AsyncRwLock::new(initial_slot_value));
let writer_conn = open_sqlcipher(&db_path, &key)?;
let WriterSpawn {
handle: write,
join,
} = if let Some(rt) = runtime_handle.clone() {
WriterActor::spawn_full_with_quota_and_slot(
writer_conn,
hnsw.clone(),
snapshot_dir.clone(),
embedder_id,
embedder.clone(),
steward,
key.clone(),
rt,
redactor,
quota_bytes,
db_path.clone(),
steward_slot.clone(),
triples_batch_signal,
)
} else {
WriterActor::spawn_full(
writer_conn,
hnsw.clone(),
snapshot_dir.clone(),
embedder_id,
)
};
let (audit, audit_shutdown) =
AuditWriter::spawn(db_path.clone(), Some(key.clone()));
let audit_sweep_handle = spawn_audit_sweep(
&tenant_id,
&db_path,
&key,
&config.audit,
runtime_handle.clone(),
);
Ok(TenantHandle {
tenant_id,
config,
db_path,
snapshot_dir,
embedder_id,
hnsw,
embedder,
write,
writer_join: Some(join),
read: pool,
audit,
audit_shutdown: Mutex::new(Some(audit_shutdown)),
audit_sweep_handle: Mutex::new(audit_sweep_handle),
replay,
drift,
used_bak_snapshot,
started_fresh,
rebuild,
steward_slot,
})
}
pub fn tenant_id(&self) -> &TenantId {
&self.tenant_id
}
pub fn config(&self) -> &crate::config::SoloConfig {
&self.config
}
pub fn db_path(&self) -> &Path {
&self.db_path
}
pub fn snapshot_dir(&self) -> &Path {
&self.snapshot_dir
}
pub fn embedder_id(&self) -> i64 {
self.embedder_id
}
pub fn write(&self) -> &WriteHandle {
&self.write
}
pub fn read(&self) -> &ReaderPool {
&self.read
}
pub fn hnsw(&self) -> &Arc<dyn VectorIndex + Send + Sync> {
&self.hnsw
}
pub fn embedder(&self) -> &Arc<dyn Embedder> {
&self.embedder
}
pub fn replay(&self) -> &ReplayReport {
&self.replay
}
pub fn drift(&self) -> &DriftReport {
&self.drift
}
pub fn used_bak_snapshot(&self) -> bool {
self.used_bak_snapshot
}
pub fn started_fresh(&self) -> bool {
self.started_fresh
}
pub fn rebuild(&self) -> &RebuildReport {
&self.rebuild
}
pub fn audit(&self) -> &AuditWriter {
&self.audit
}
pub fn steward_slot(
&self,
) -> &Arc<AsyncRwLock<Option<Arc<solo_steward::Steward>>>> {
&self.steward_slot
}
#[cfg(any(test, feature = "test-support"))]
#[allow(clippy::too_many_arguments)]
pub fn from_parts_for_tests(
tenant_id: TenantId,
config: crate::config::SoloConfig,
db_path: PathBuf,
snapshot_dir: PathBuf,
embedder_id: i64,
hnsw: Arc<dyn VectorIndex + Send + Sync>,
embedder: Arc<dyn Embedder>,
write: WriteHandle,
writer_join: std::thread::JoinHandle<()>,
read: ReaderPool,
) -> Self {
let audit = AuditWriter::noop();
Self {
tenant_id,
config,
db_path,
snapshot_dir,
embedder_id,
hnsw,
embedder,
write,
writer_join: Some(writer_join),
read,
audit,
audit_shutdown: Mutex::new(None),
audit_sweep_handle: Mutex::new(None),
replay: ReplayReport::default(),
drift: DriftReport::default(),
used_bak_snapshot: false,
started_fresh: true,
rebuild: RebuildReport::default(),
steward_slot: Arc::new(AsyncRwLock::new(None)),
}
}
pub async fn shutdown(mut self, save_snapshot: bool) -> Result<()> {
if save_snapshot
&& let Err(e) = self.write.save_snapshot().await
{
tracing::warn!(
tenant = %self.tenant_id,
error = %e,
"tenant shutdown: final snapshot save failed (continuing)"
);
}
if let Some(handle) = self.audit_sweep_handle.lock().unwrap().take() {
handle.abort();
}
let audit_shutdown = self.audit_shutdown.lock().unwrap().take();
let _ = std::mem::replace(&mut self.audit, AuditWriter::noop());
if let Some(shutdown) = audit_shutdown {
shutdown.join().await;
}
let write = self.write;
drop(write);
if let Some(join) = self.writer_join.take() {
tokio::task::spawn_blocking(move || {
if let Err(panic) = join.join() {
tracing::error!(?panic, "tenant: writer thread panicked on shutdown");
}
})
.await
.ok();
}
drop(self.read);
Ok(())
}
}
fn rebuild_tombstones_from_sql(
conn: &Connection,
hnsw: &dyn VectorIndex,
) -> Result<usize> {
let mut stmt = conn
.prepare("SELECT rowid FROM episodes WHERE status = 'forgotten'")
.map_err(|e| Error::storage(format!("prepare forgotten select: {e}")))?;
let rows = stmt
.query_map([], |row| row.get::<_, i64>(0))
.map_err(|e| Error::storage(format!("query_map forgotten: {e}")))?;
let mut count = 0usize;
for r in rows {
let rowid = r.map_err(|e| Error::storage(format!("forgotten row decode: {e}")))?;
hnsw.remove(episode_hnsw_id(rowid))?;
count += 1;
}
Ok(count)
}
fn load_hnsw_with_fallback(
snapshot_dir: &Path,
factory: &HnswFactory,
dim: usize,
) -> (HnswIndex, bool, bool) {
match snapshot::load(snapshot_dir) {
Ok(idx) => {
tracing::info!(
snapshot_kind = "live",
dim = idx.dim(),
len = idx.len(),
"tenant HNSW loaded from live snapshot"
);
(idx, false, false)
}
Err(primary_err) => {
tracing::warn!(error = %primary_err, "tenant: live HNSW snapshot failed; trying .bak");
match snapshot::load_bak(snapshot_dir) {
Ok(idx) => {
tracing::warn!(
snapshot_kind = "bak",
dim = idx.dim(),
len = idx.len(),
"tenant HNSW loaded from backup snapshot — investigate the live pair"
);
(idx, true, false)
}
Err(bak_err) => {
tracing::warn!(
primary = %primary_err,
bak = %bak_err,
dim,
"tenant: no HNSW snapshot available; starting fresh empty index"
);
let empty = factory
.create(dim)
.expect("HnswFactory::create with valid dim must succeed");
(empty, false, true)
}
}
}
}
}
fn spawn_audit_sweep(
tenant_id: &TenantId,
db_path: &Path,
key: &KeyMaterial,
audit_cfg: &crate::config::AuditSettings,
runtime_handle: Option<TokioHandle>,
) -> Option<tokio::task::JoinHandle<()>> {
let retention_days = audit_cfg.retention_days?;
let interval_secs = audit_cfg.purge_interval_secs?;
let rt = runtime_handle?;
let tenant = tenant_id.clone();
let path = db_path.to_path_buf();
let key = key.clone();
let interval = std::time::Duration::from_secs(interval_secs);
Some(rt.spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
ticker.tick().await;
let cutoff_ms = chrono::Utc::now().timestamp_millis()
- i64::from(retention_days) * 86_400_000;
let path = path.clone();
let key = key.clone();
let tenant = tenant.clone();
let outcome = tokio::task::spawn_blocking(move || {
let mut conn = match open_sqlcipher(&path, &key) {
Ok(c) => c,
Err(e) => return Err(e),
};
purge_older_than(&mut conn, cutoff_ms)
})
.await;
match outcome {
Ok(Ok(deleted)) if deleted > 0 => tracing::info!(
tenant = %tenant,
deleted,
cutoff_ms,
"audit retention sweep purged rows"
),
Ok(Ok(_)) => tracing::debug!(
tenant = %tenant,
"audit retention sweep ran (nothing to purge)"
),
Ok(Err(e)) => tracing::warn!(
tenant = %tenant,
error = %e,
"audit retention sweep failed (will retry next interval)"
),
Err(e) => tracing::warn!(
tenant = %tenant,
error = %e,
"audit retention sweep join failed"
),
}
}
}))
}
#[allow(dead_code)]
pub(crate) fn lookup_tenant_db_filename(
index: &TenantsIndex,
tenant_id: &TenantId,
) -> Result<String> {
let rec = index.lookup(tenant_id)?.ok_or_else(|| {
Error::not_found(format!("tenant `{tenant_id}` not found in tenants_index"))
})?;
if rec.status != crate::tenants::TenantStatus::Active {
return Err(Error::conflict(format!(
"tenant `{tenant_id}` has status `{}`; refusing to open",
rec.status.as_sql_str()
)));
}
Ok(rec.db_filename)
}