solo-storage 0.8.0

Solo: SQLite + SQLCipher persistence layer
Documentation
// SPDX-License-Identifier: Apache-2.0

//! Daemon-side multi-tenant registry.
//!
//! Holds a `TenantsIndex` plus an in-memory map of opened
//! `Arc<TenantHandle>`. Tenants are opened **lazily**: the first request
//! for a tenant_id pays the open cost (DB connection, migration check,
//! HNSW load, writer-actor spawn, reader pool build); subsequent requests
//! are an `RwLock<HashMap>` hit.
//!
//! ## Lazy-load discipline
//!
//! `get_or_open(tenant_id)` is the hot path. Under concurrency, 100
//! simultaneous requests for the same tenant_id must:
//!   * Return the same `Arc<TenantHandle>` (no race).
//!   * Open the underlying tenant exactly once (no duplicate
//!     writer-actor / reader pool / HNSW load).
//!
//! Implementation: a `RwLock<HashMap<TenantId, Arc<TenantHandle>>>` is the
//! cached map. The slow-path (cache miss) takes the write lock, double-
//! checks the entry, and only opens the tenant if still absent. While the
//! write lock is held, every other concurrent first-access request blocks
//! on the lock and (on resumption) sees the cached entry — no duplicate
//! open. Open work happens inside `tokio::task::spawn_blocking` because
//! `TenantHandle::open` does synchronous DB + HNSW work.
//!
//! Trade-off: holding the registry write lock across the open means every
//! _other_ tenant's first-access requests also block until this one
//! finishes opening. For v0.8.0's deployment shape (≤ 10 tenants typical,
//! open is ~100-500ms), that's acceptable. A future version with hundreds
//! of tenants can move to a per-key Mutex pattern.
//!
//! ## Shutdown
//!
//! `shutdown_all()` drains every cached handle by calling
//! `TenantHandle::shutdown` on each. Designed to run once, at daemon
//! exit. Concurrent activity during shutdown is the operator's problem
//! — this is the daemon-graceful-exit path, the supervisor already
//! stopped accepting work.

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::{Mutex, RwLock};

use solo_core::{Embedder, Error, Result, TenantId};

use crate::key_material::KeyMaterial;
use crate::tenants::handle::lookup_tenant_db_filename;
use crate::tenants::{TenantHandle, TenantOpenParams, TenantRecord, TenantsIndex};
use crate::vector_index::HnswParams;

/// Bootstrap deps shared across every tenant the registry opens. Held by
/// the registry; cloned cheaply into each `TenantOpenParams`.
struct RegistryDeps {
    data_dir: PathBuf,
    key: KeyMaterial,
    embedder: Arc<dyn Embedder>,
    hnsw_params: HnswParams,
    steward: Option<Arc<solo_steward::Steward>>,
    /// Runtime handle captured at registry-open time. Passed to each
    /// `TenantHandle::open` so writer-actors can `block_on` async
    /// embedder calls during reembed.
    runtime_handle: Option<TokioHandle>,
}

/// Daemon-side multi-tenant registry.
pub struct TenantRegistry {
    /// Mutex around the SQLCipher connection to `tenants_index.db`. Single
    /// access at a time (the registry is the only thing that touches it).
    index: Mutex<TenantsIndex>,
    /// Opened tenant handles, keyed by tenant_id.
    handles: RwLock<HashMap<TenantId, Arc<TenantHandle>>>,
    deps: RegistryDeps,
}

/// Parameters for opening the registry.
pub struct TenantRegistryParams {
    pub data_dir: PathBuf,
    pub key: KeyMaterial,
    pub embedder: Arc<dyn Embedder>,
    pub hnsw_params: HnswParams,
    pub steward: Option<Arc<solo_steward::Steward>>,
    /// Tokio runtime handle. Required by the writer-actor spawn path for
    /// `block_on` of async embedder calls during reembed. The registry
    /// passes this through to every `TenantHandle::open`. Construct with
    /// `tokio::runtime::Handle::current()` from inside `#[tokio::main]`.
    pub runtime_handle: Option<TokioHandle>,
}

impl TenantRegistry {
    /// Open the registry. Opens `tenants_index.db`, applies its
    /// migrations, and (if the v0.7.1 layout is detected) runs the
    /// mass-data-move helper before returning. Does NOT auto-open any
    /// tenant handle; that happens lazily on first `get_or_open`.
    pub fn open(params: TenantRegistryParams) -> Result<Self> {
        let TenantRegistryParams {
            data_dir,
            key,
            embedder,
            hnsw_params,
            steward,
            runtime_handle,
        } = params;

        // Auto-detect v0.7.1 layout → run mass-data-move BEFORE opening
        // the registry. Idempotent if the helper already ran. We replicate
        // the detection logic from `startup::run` so the registry-open
        // path works whether called from daemon or one-shot.
        let tenants_index_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
        let legacy_db_path = data_dir.join("solo.db");
        if !tenants_index_path.is_file() && legacy_db_path.is_file() {
            tracing::info!(
                data_dir = %data_dir.display(),
                "v0.7.1 single-DB layout detected; running v0.7.1 → v0.8.0 mass-data-move"
            );
            crate::tenants::migrate_v071_to_v080(&data_dir, &key)?;
        }

        let index = TenantsIndex::open(&data_dir, &key)?;

        Ok(Self {
            index: Mutex::new(index),
            handles: RwLock::new(HashMap::new()),
            deps: RegistryDeps {
                data_dir,
                key,
                embedder,
                hnsw_params,
                steward,
                runtime_handle,
            },
        })
    }

    /// Data dir this registry serves.
    pub fn data_dir(&self) -> &Path {
        &self.deps.data_dir
    }

    /// Get (or lazily open) the handle for `tenant_id`.
    ///
    /// Concurrent first-access for the same tenant: see module docs. All
    /// callers see the same `Arc<TenantHandle>`; no duplicate handle is
    /// opened.
    pub async fn get_or_open(
        &self,
        tenant_id: &TenantId,
    ) -> Result<Arc<TenantHandle>> {
        // Fast path: read lock + HashMap::get.
        {
            let map = self.handles.read().await;
            if let Some(h) = map.get(tenant_id) {
                return Ok(h.clone());
            }
        }

        // Slow path: open under write lock. Double-check the entry first
        // in case a concurrent first-access already opened it while we
        // were waiting on the write lock.
        let mut map = self.handles.write().await;
        if let Some(h) = map.get(tenant_id) {
            return Ok(h.clone());
        }

        // Lookup the tenant's row in the index. Need to drop the index
        // mutex BEFORE the open work so a concurrent `forget_handle` can
        // make progress.
        let db_filename = {
            let index = self.index.lock().await;
            lookup_tenant_db_filename(&index, tenant_id)?
        };

        // Build params + open the tenant. This is sync + blocking, so run
        // it on a blocking thread to avoid stalling the tokio runtime.
        let tenant_id_clone = tenant_id.clone();
        let open_params = TenantOpenParams {
            data_dir: self.deps.data_dir.clone(),
            key: self.deps.key.clone(),
            db_filename,
            embedder: self.deps.embedder.clone(),
            hnsw_params: self.deps.hnsw_params.clone(),
            steward: self.deps.steward.clone(),
            runtime_handle: self.deps.runtime_handle.clone(),
        };
        let handle = tokio::task::spawn_blocking(move || {
            TenantHandle::open(tenant_id_clone, open_params)
        })
        .await
        .map_err(|e| Error::storage(format!("join spawn_blocking for TenantHandle::open: {e}")))??;

        let arc = Arc::new(handle);
        map.insert(tenant_id.clone(), arc.clone());
        Ok(arc)
    }

    /// Evict a tenant from the in-memory cache (used by the P6
    /// hard-delete sequence). The caller is responsible for calling
    /// `TenantHandle::shutdown` on the returned handle.
    ///
    /// Returns the evicted handle if it was in the cache, or `None` if
    /// it wasn't open.
    pub async fn forget_handle(
        &self,
        tenant_id: &TenantId,
    ) -> Option<Arc<TenantHandle>> {
        let mut map = self.handles.write().await;
        map.remove(tenant_id)
    }

    /// List every tenant in the registry index (regardless of open state).
    pub async fn list_active(&self) -> Result<Vec<TenantRecord>> {
        let index = self.index.lock().await;
        index.list()
    }

    /// Borrow the underlying TenantsIndex for admin operations (P6/P7
    /// create/delete/backup/restore). Avoids re-opening the SQLCipher
    /// connection for every admin call.
    pub async fn with_index<F, R>(&self, f: F) -> R
    where
        F: FnOnce(&mut TenantsIndex) -> R,
    {
        let mut index = self.index.lock().await;
        f(&mut *index)
    }

    /// Graceful shutdown of every cached handle. Saves snapshots, drains
    /// writers, drops pools. Designed for daemon exit.
    pub async fn shutdown_all(&self) {
        // Drain the map into an owned Vec so we can run shutdowns
        // sequentially without holding the write lock for the whole run.
        let handles: Vec<(TenantId, Arc<TenantHandle>)> = {
            let mut map = self.handles.write().await;
            map.drain().collect()
        };
        for (tenant_id, handle) in handles {
            // We only have an Arc<TenantHandle> in the map; shutdown
            // consumes a `TenantHandle` (not Arc). If we hold the only
            // remaining Arc, unwrap is fine; if not (e.g., a handler
            // still holds one), we fall back to logging a warning and
            // moving on — the next process restart will reopen + flush.
            match Arc::try_unwrap(handle) {
                Ok(owned) => {
                    if let Err(e) = owned.shutdown(true).await {
                        tracing::warn!(
                            tenant = %tenant_id,
                            error = %e,
                            "tenant shutdown returned error"
                        );
                    }
                }
                Err(_still_shared) => {
                    tracing::warn!(
                        tenant = %tenant_id,
                        "tenant handle still has outstanding Arc clones at \
                         shutdown_all; skipping. The next process restart \
                         will reload from snapshot + replay pending_index."
                    );
                }
            }
        }
    }

    /// True iff this tenant is currently in the in-memory cache. Test
    /// helper / metric surface.
    pub async fn is_open(&self, tenant_id: &TenantId) -> bool {
        let map = self.handles.read().await;
        map.contains_key(tenant_id)
    }

    /// Test-only constructor that builds a registry around a single
    /// pre-built `TenantHandle`. Skips opening `tenants_index.db` (which
    /// requires SQLCipher) so test harnesses can drive transport-layer
    /// flows without paying the full bootstrap cost.
    ///
    /// The supplied handle becomes the registry's only cached tenant.
    /// `get_or_open(tenant_id)` returns it for the handle's own
    /// tenant_id; any other tenant_id errors with NotFound (no index to
    /// lazy-open from).
    #[cfg(any(test, feature = "test-support"))]
    pub fn for_tests_with_single_tenant(
        data_dir: std::path::PathBuf,
        key: KeyMaterial,
        embedder: Arc<dyn solo_core::Embedder>,
        handle: Arc<TenantHandle>,
    ) -> Self {
        let tenant_id = handle.tenant_id().clone();
        let mut handles_map = HashMap::new();
        handles_map.insert(tenant_id, handle);
        // Open an in-memory TenantsIndex stub. We won't really persist
        // through it; the harness's get_or_open() lookups hit the
        // cached HashMap and never reach the index.
        // For simplicity, use an in-memory SQLite connection bypassing
        // SQLCipher entirely.
        let conn = rusqlite::Connection::open_in_memory()
            .expect("open in-memory tenants_index stub for tests");
        // Apply the registry's schema so any code that does happen to
        // touch the index (e.g. list_active) doesn't blow up.
        let mut conn = conn;
        crate::migration::run_tenants_index_migrations(&mut conn)
            .expect("apply tenants_index migrations to in-memory test stub");
        let index = TenantsIndex::from_connection_for_tests(conn);
        Self {
            index: Mutex::new(index),
            handles: RwLock::new(handles_map),
            deps: RegistryDeps {
                data_dir,
                key,
                embedder,
                hnsw_params: crate::vector_index::HnswParams::default(),
                steward: None,
                runtime_handle: None,
            },
        }
    }
}