solo-storage 0.9.1

Solo: SQLite + SQLCipher persistence layer
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
// SPDX-License-Identifier: Apache-2.0

//! Daemon-side multi-tenant registry.
//!
//! Holds a `TenantsIndex` plus an in-memory map of opened
//! `Arc<TenantHandle>`. Tenants are opened **lazily**: the first request
//! for a tenant_id pays the open cost (DB connection, migration check,
//! HNSW load, writer-actor spawn, reader pool build); subsequent requests
//! are an `RwLock<HashMap>` hit.
//!
//! ## Lazy-load discipline
//!
//! `get_or_open(tenant_id)` is the hot path. Under concurrency, 100
//! simultaneous requests for the same tenant_id must:
//!   * Return the same `Arc<TenantHandle>` (no race).
//!   * Open the underlying tenant exactly once (no duplicate
//!     writer-actor / reader pool / HNSW load).
//!
//! Implementation: a `RwLock<HashMap<TenantId, Arc<TenantHandle>>>` is the
//! cached map. The slow-path (cache miss) takes the write lock, double-
//! checks the entry, and only opens the tenant if still absent. While the
//! write lock is held, every other concurrent first-access request blocks
//! on the lock and (on resumption) sees the cached entry — no duplicate
//! open. Open work happens inside `tokio::task::spawn_blocking` because
//! `TenantHandle::open` does synchronous DB + HNSW work.
//!
//! Trade-off: holding the registry write lock across the open means every
//! _other_ tenant's first-access requests also block until this one
//! finishes opening. For v0.8.0's deployment shape (≤ 10 tenants typical,
//! open is ~100-500ms), that's acceptable. A future version with hundreds
//! of tenants can move to a per-key Mutex pattern.
//!
//! ## Shutdown
//!
//! `shutdown_all()` drains every cached handle by calling
//! `TenantHandle::shutdown` on each. Designed to run once, at daemon
//! exit. Concurrent activity during shutdown is the operator's problem
//! — this is the daemon-graceful-exit path, the supervisor already
//! stopped accepting work.

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::{Mutex, RwLock};

use solo_core::{Embedder, Error, Result, TenantId};

use crate::key_material::KeyMaterial;
use crate::tenants::{TenantHandle, TenantOpenParams, TenantRecord, TenantsIndex};
use crate::vector_index::HnswParams;

/// Bootstrap deps shared across every tenant the registry opens. Held by
/// the registry; cloned cheaply into each `TenantOpenParams`.
struct RegistryDeps {
    data_dir: PathBuf,
    key: KeyMaterial,
    embedder: Arc<dyn Embedder>,
    hnsw_params: HnswParams,
    steward: Option<Arc<solo_steward::Steward>>,
    /// Runtime handle captured at registry-open time. Passed to each
    /// `TenantHandle::open` so writer-actors can `block_on` async
    /// embedder calls during reembed.
    runtime_handle: Option<TokioHandle>,
    /// v0.9.0 P0c: optional Steward factory threaded through to every
    /// `TenantHandle::open` so the new per-tenant `steward_slot` can be
    /// populated eagerly for static backends (Anthropic / OpenAI /
    /// Ollama / None) and stay empty for the MCP-sampling backend.
    ///
    /// When `None` (the backwards-compat path), the slot is seeded
    /// from `steward: Option<Arc<Steward>>` so the v0.8.x and v0.9.0+
    /// callers observe the same Steward identity from the slot and
    /// from the writer-actor's captured field. v0.9.0 P2 fully migrates
    /// from this dual path to the slot-only path.
    steward_factory: Option<Arc<dyn crate::steward_factory::StewardFactory>>,
    /// v0.9.0 P4-revision (P4 audit M1): optional count-based trigger
    /// signal threaded through to every tenant's writer-actor. The
    /// daemon constructs ONE `Arc<TriplesBatchSignal>` at startup and
    /// shares it with both every tenant's writer-actor (so the actor
    /// can ping after each `Remember`) AND its own
    /// `triples_batch_timer` (so the select-arm can fire on count).
    triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
}

/// Daemon-side multi-tenant registry.
pub struct TenantRegistry {
    /// Mutex around the SQLCipher connection to `tenants_index.db`. Single
    /// access at a time (the registry is the only thing that touches it).
    index: Mutex<TenantsIndex>,
    /// Opened tenant handles, keyed by tenant_id.
    handles: RwLock<HashMap<TenantId, Arc<TenantHandle>>>,
    deps: RegistryDeps,
}

/// Parameters for opening the registry.
pub struct TenantRegistryParams {
    pub data_dir: PathBuf,
    pub key: KeyMaterial,
    pub embedder: Arc<dyn Embedder>,
    pub hnsw_params: HnswParams,
    pub steward: Option<Arc<solo_steward::Steward>>,
    /// Tokio runtime handle. Required by the writer-actor spawn path for
    /// `block_on` of async embedder calls during reembed. The registry
    /// passes this through to every `TenantHandle::open`. Construct with
    /// `tokio::runtime::Handle::current()` from inside `#[tokio::main]`.
    pub runtime_handle: Option<TokioHandle>,
    /// v0.9.0 P0c: optional Steward factory. When `Some(_)`, every
    /// `TenantHandle::open` calls `factory.build()` and writes the
    /// result into the per-tenant `steward_slot`. Static-backend
    /// factories (`StaticStewardFactory` around an Anthropic / OpenAI /
    /// Ollama / Noop-backed Steward) populate the slot eagerly with
    /// zero hot-path lock-contention cost; the MCP-sampling factory
    /// returns `Ok(None)` for the eager-populate (v0.9.0 P2's
    /// `SoloMcpServer::initialize` hook bypasses the factory entirely
    /// on the sampling path and writes into the slot directly).
    ///
    /// When `None` (the backwards-compat path), the slot is seeded
    /// from `steward: Option<Arc<Steward>>` instead. Existing v0.8.x
    /// callers that pass `steward: Some(...)` without setting a
    /// factory observe consistent behavior — the slot mirrors the
    /// writer-actor's captured Steward.
    #[allow(clippy::type_complexity)]
    pub steward_factory:
        Option<Arc<dyn crate::steward_factory::StewardFactory>>,
    /// v0.9.0 P4-revision (P4 audit M1): optional count-based trigger
    /// signal. The daemon passes `Some(Arc::clone(&signal))` so its
    /// `triples_batch_timer` and every tenant's writer-actor share the
    /// same counter + notify. `None` for paths that don't drive the
    /// background batch (CLI one-shot commands, tests).
    pub triples_batch_signal:
        Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
}

impl TenantRegistry {
    /// Open the registry. Opens `tenants_index.db`, applies its
    /// migrations, and (if the v0.7.1 layout is detected) runs the
    /// mass-data-move helper before returning. Does NOT auto-open any
    /// tenant handle; that happens lazily on first `get_or_open`.
    pub fn open(params: TenantRegistryParams) -> Result<Self> {
        let TenantRegistryParams {
            data_dir,
            key,
            embedder,
            hnsw_params,
            steward,
            runtime_handle,
            steward_factory,
            triples_batch_signal,
        } = params;

        // Auto-detect v0.7.1 layout → run mass-data-move BEFORE opening
        // the registry. Idempotent if the helper already ran. We replicate
        // the detection logic from `startup::run` so the registry-open
        // path works whether called from daemon or one-shot.
        let tenants_index_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
        let legacy_db_path = data_dir.join("solo.db");
        if !tenants_index_path.is_file() && legacy_db_path.is_file() {
            tracing::info!(
                data_dir = %data_dir.display(),
                "v0.7.1 single-DB layout detected; running v0.7.1 → v0.8.0 mass-data-move"
            );
            crate::tenants::migrate_v071_to_v080(&data_dir, &key)?;
        }

        let index = TenantsIndex::open(&data_dir, &key)?;

        Ok(Self {
            index: Mutex::new(index),
            handles: RwLock::new(HashMap::new()),
            deps: RegistryDeps {
                data_dir,
                key,
                embedder,
                hnsw_params,
                steward,
                runtime_handle,
                steward_factory,
                triples_batch_signal,
            },
        })
    }

    /// Data dir this registry serves.
    pub fn data_dir(&self) -> &Path {
        &self.deps.data_dir
    }

    /// Get (or lazily open) the handle for `tenant_id`.
    ///
    /// Concurrent first-access for the same tenant: see module docs. All
    /// callers see the same `Arc<TenantHandle>`; no duplicate handle is
    /// opened.
    ///
    /// v0.9.0 P1: every call (cache miss OR cache hit) stamps
    /// `tenants.last_accessed = now_ms` in the registry before returning
    /// the handle. This closes the v0.8.0 doc-vs-code gap (lesson #39
    /// second incident — the column was documented but never written).
    /// Failures of the stamp are non-fatal: they emit a single
    /// `tracing::warn!` but do not block the open, because the
    /// last_accessed value is observational, not load-bearing for any
    /// invariant.
    pub async fn get_or_open(
        &self,
        tenant_id: &TenantId,
    ) -> Result<Arc<TenantHandle>> {
        // Fast path: read lock + HashMap::get.
        {
            let map = self.handles.read().await;
            if let Some(h) = map.get(tenant_id) {
                let handle = h.clone();
                // Release the read lock BEFORE acquiring the index
                // mutex so concurrent slow-path opens aren't blocked by
                // our stamp write.
                drop(map);
                self.touch_last_accessed_best_effort(tenant_id).await;
                return Ok(handle);
            }
        }

        // Slow path: open under write lock. Double-check the entry first
        // in case a concurrent first-access already opened it while we
        // were waiting on the write lock.
        let mut map = self.handles.write().await;
        if let Some(h) = map.get(tenant_id) {
            let handle = h.clone();
            drop(map);
            self.touch_last_accessed_best_effort(tenant_id).await;
            return Ok(handle);
        }

        // Lookup the tenant's row in the index. Need to drop the index
        // mutex BEFORE the open work so a concurrent `forget_handle` can
        // make progress. v0.8.1 P3: also capture the tenant's quota_bytes
        // so the writer-actor can enforce it.
        let (db_filename, quota_bytes) = {
            let index = self.index.lock().await;
            let rec = index.lookup(tenant_id)?.ok_or_else(|| {
                Error::not_found(format!(
                    "tenant `{tenant_id}` not found in tenants_index"
                ))
            })?;
            if rec.status != crate::tenants::TenantStatus::Active {
                return Err(Error::conflict(format!(
                    "tenant `{tenant_id}` has status `{}`; refusing to open",
                    rec.status.as_sql_str()
                )));
            }
            (rec.db_filename, rec.quota_bytes)
        };

        // Build params + open the tenant. This is sync + blocking, so run
        // it on a blocking thread to avoid stalling the tokio runtime.
        let tenant_id_clone = tenant_id.clone();
        let open_params = TenantOpenParams {
            data_dir: self.deps.data_dir.clone(),
            key: self.deps.key.clone(),
            db_filename,
            embedder: self.deps.embedder.clone(),
            hnsw_params: self.deps.hnsw_params.clone(),
            steward: self.deps.steward.clone(),
            runtime_handle: self.deps.runtime_handle.clone(),
            quota_bytes,
            steward_factory: self.deps.steward_factory.clone(),
            triples_batch_signal: self.deps.triples_batch_signal.clone(),
        };
        let handle = tokio::task::spawn_blocking(move || {
            TenantHandle::open(tenant_id_clone, open_params)
        })
        .await
        .map_err(|e| Error::storage(format!("join spawn_blocking for TenantHandle::open: {e}")))??;

        let arc = Arc::new(handle);
        map.insert(tenant_id.clone(), arc.clone());
        // Drop the handles write lock BEFORE the stamp write so a
        // concurrent get_or_open for a different tenant can proceed.
        drop(map);
        self.touch_last_accessed_best_effort(tenant_id).await;
        Ok(arc)
    }

    /// v0.9.0 P1: helper for `get_or_open` — writes
    /// `tenants.last_accessed = now_ms` on the registry row.
    ///
    /// Best-effort: failures (registry write contention, transient
    /// SQLite error) log `tracing::warn!` but never bubble to the
    /// caller. The `last_accessed` column is observational — it powers
    /// the `solo tenants list` "last_accessed" column and informs
    /// future tenant-eviction heuristics. A missed stamp doesn't
    /// compromise any invariant.
    ///
    /// Failure path also covers the "tenant was concurrently deleted
    /// between cache hit and stamp" case: `touch_last_accessed` returns
    /// `Ok(0)` in that case, which we silently ignore — the caller has
    /// already received the cached handle.
    async fn touch_last_accessed_best_effort(&self, tenant_id: &TenantId) {
        let now_ms: i64 = chrono::Utc::now().timestamp_millis();
        let mut index = self.index.lock().await;
        if let Err(e) = index.touch_last_accessed(tenant_id, now_ms) {
            tracing::warn!(
                tenant = %tenant_id,
                error = %e,
                "touch_last_accessed: registry stamp failed; tenant open \
                 succeeded but the last_accessed column will not reflect \
                 this open until the next successful call"
            );
        }
    }

    /// Evict a tenant from the in-memory cache (used by the P6
    /// hard-delete sequence). The caller is responsible for calling
    /// `TenantHandle::shutdown` on the returned handle.
    ///
    /// Returns the evicted handle if it was in the cache, or `None` if
    /// it wasn't open.
    pub async fn forget_handle(
        &self,
        tenant_id: &TenantId,
    ) -> Option<Arc<TenantHandle>> {
        let mut map = self.handles.write().await;
        map.remove(tenant_id)
    }

    /// List every tenant in the registry index (regardless of open state).
    pub async fn list_active(&self) -> Result<Vec<TenantRecord>> {
        let index = self.index.lock().await;
        index.list()
    }

    /// Borrow the underlying TenantsIndex for admin operations (P6/P7
    /// create/delete/backup/restore). Avoids re-opening the SQLCipher
    /// connection for every admin call.
    pub async fn with_index<F, R>(&self, f: F) -> R
    where
        F: FnOnce(&mut TenantsIndex) -> R,
    {
        let mut index = self.index.lock().await;
        f(&mut *index)
    }

    /// Graceful shutdown of every cached handle. Saves snapshots, drains
    /// writers, drops pools. Designed for daemon exit.
    pub async fn shutdown_all(&self) {
        // Drain the map into an owned Vec so we can run shutdowns
        // sequentially without holding the write lock for the whole run.
        let handles: Vec<(TenantId, Arc<TenantHandle>)> = {
            let mut map = self.handles.write().await;
            map.drain().collect()
        };
        for (tenant_id, handle) in handles {
            // We only have an Arc<TenantHandle> in the map; shutdown
            // consumes a `TenantHandle` (not Arc). If we hold the only
            // remaining Arc, unwrap is fine; if not (e.g., a handler
            // still holds one), we fall back to logging a warning and
            // moving on — the next process restart will reopen + flush.
            match Arc::try_unwrap(handle) {
                Ok(owned) => {
                    if let Err(e) = owned.shutdown(true).await {
                        tracing::warn!(
                            tenant = %tenant_id,
                            error = %e,
                            "tenant shutdown returned error"
                        );
                    }
                }
                Err(_still_shared) => {
                    tracing::warn!(
                        tenant = %tenant_id,
                        "tenant handle still has outstanding Arc clones at \
                         shutdown_all; skipping. The next process restart \
                         will reload from snapshot + replay pending_index."
                    );
                }
            }
        }
    }

    /// True iff this tenant is currently in the in-memory cache. Test
    /// helper / metric surface.
    pub async fn is_open(&self, tenant_id: &TenantId) -> bool {
        let map = self.handles.read().await;
        map.contains_key(tenant_id)
    }

    /// Test-only constructor that builds a registry around a single
    /// pre-built `TenantHandle`. Skips opening `tenants_index.db` (which
    /// requires SQLCipher) so test harnesses can drive transport-layer
    /// flows without paying the full bootstrap cost.
    ///
    /// The supplied handle becomes the registry's only cached tenant.
    /// `get_or_open(tenant_id)` returns it for the handle's own
    /// tenant_id; any other tenant_id errors with NotFound (no index to
    /// lazy-open from).
    #[cfg(any(test, feature = "test-support"))]
    pub fn for_tests_with_single_tenant(
        data_dir: std::path::PathBuf,
        key: KeyMaterial,
        embedder: Arc<dyn solo_core::Embedder>,
        handle: Arc<TenantHandle>,
    ) -> Self {
        let tenant_id = handle.tenant_id().clone();
        let mut handles_map = HashMap::new();
        handles_map.insert(tenant_id, handle);
        // Open an in-memory TenantsIndex stub. We won't really persist
        // through it; the harness's get_or_open() lookups hit the
        // cached HashMap and never reach the index.
        // For simplicity, use an in-memory SQLite connection bypassing
        // SQLCipher entirely.
        let conn = rusqlite::Connection::open_in_memory()
            .expect("open in-memory tenants_index stub for tests");
        // Apply the registry's schema so any code that does happen to
        // touch the index (e.g. list_active) doesn't blow up.
        let mut conn = conn;
        crate::migration::run_tenants_index_migrations(&mut conn)
            .expect("apply tenants_index migrations to in-memory test stub");
        let index = TenantsIndex::from_connection_for_tests(conn);
        Self {
            index: Mutex::new(index),
            handles: RwLock::new(handles_map),
            deps: RegistryDeps {
                data_dir,
                key,
                embedder,
                hnsw_params: crate::vector_index::HnswParams::default(),
                steward: None,
                runtime_handle: None,
                steward_factory: None,
                triples_batch_signal: None,
            },
        }
    }
}