solo-storage 0.11.5

Solo: SQLite + SQLCipher persistence layer
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
// SPDX-License-Identifier: Apache-2.0

//! Daemon-side multi-tenant registry.
//!
//! Holds a `TenantsIndex` plus an in-memory map of opened
//! `Arc<TenantHandle>`. Tenants are opened **lazily**: the first request
//! for a tenant_id pays the open cost (DB connection, migration check,
//! HNSW load, writer-actor spawn, reader pool build); subsequent requests
//! are an `RwLock<HashMap>` hit.
//!
//! ## Lazy-load discipline
//!
//! `get_or_open(tenant_id)` is the hot path. Under concurrency, 100
//! simultaneous requests for the same tenant_id must:
//!   * Return the same `Arc<TenantHandle>` (no race).
//!   * Open the underlying tenant exactly once (no duplicate
//!     writer-actor / reader pool / HNSW load).
//!
//! Implementation: a `RwLock<HashMap<TenantId, Arc<TenantHandle>>>` is the
//! cached map. The slow-path (cache miss) takes the write lock, double-
//! checks the entry, and only opens the tenant if still absent. While the
//! write lock is held, every other concurrent first-access request blocks
//! on the lock and (on resumption) sees the cached entry — no duplicate
//! open. Open work happens inside `tokio::task::spawn_blocking` because
//! `TenantHandle::open` does synchronous DB + HNSW work.
//!
//! Trade-off: holding the registry write lock across the open means every
//! _other_ tenant's first-access requests also block until this one
//! finishes opening. For v0.8.0's deployment shape (≤ 10 tenants typical,
//! open is ~100-500ms), that's acceptable. A future version with hundreds
//! of tenants can move to a per-key Mutex pattern.
//!
//! ## Shutdown
//!
//! `shutdown_all()` drains every cached handle by calling
//! `TenantHandle::shutdown` on each. Designed to run once, at daemon
//! exit. Concurrent activity during shutdown is the operator's problem
//! — this is the daemon-graceful-exit path, the supervisor already
//! stopped accepting work.

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::{Mutex, RwLock};

use solo_core::{Embedder, Error, Result, TenantId};

use crate::key_material::KeyMaterial;
use crate::tenants::{TenantHandle, TenantOpenParams, TenantRecord, TenantsIndex};
use crate::vector_index::HnswParams;

/// Bootstrap deps shared across every tenant the registry opens. Held by
/// the registry; cloned cheaply into each `TenantOpenParams`.
struct RegistryDeps {
    data_dir: PathBuf,
    key: KeyMaterial,
    embedder: Arc<dyn Embedder>,
    hnsw_params: HnswParams,
    steward: Option<Arc<solo_steward::Steward>>,
    /// Runtime handle captured at registry-open time. Passed to each
    /// `TenantHandle::open` so writer-actors can `block_on` async
    /// embedder calls during reembed.
    runtime_handle: Option<TokioHandle>,
    /// v0.9.0 P0c: optional Steward factory threaded through to every
    /// `TenantHandle::open` so the new per-tenant `steward_slot` can be
    /// populated eagerly for static backends (Anthropic / OpenAI /
    /// Ollama / None) and stay empty for the MCP-sampling backend.
    ///
    /// When `None` (the backwards-compat path), the slot is seeded
    /// from `steward: Option<Arc<Steward>>` so the v0.8.x and v0.9.0+
    /// callers observe the same Steward identity from the slot and
    /// from the writer-actor's captured field. v0.9.0 P2 fully migrates
    /// from this dual path to the slot-only path.
    steward_factory: Option<Arc<dyn crate::steward_factory::StewardFactory>>,
    /// v0.9.0 P4-revision (P4 audit M1): optional count-based trigger
    /// signal threaded through to every tenant's writer-actor. The
    /// daemon constructs ONE `Arc<TriplesBatchSignal>` at startup and
    /// shares it with both every tenant's writer-actor (so the actor
    /// can ping after each `Remember`) AND its own
    /// `triples_batch_timer` (so the select-arm can fire on count).
    triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
}

/// Daemon-side multi-tenant registry.
pub struct TenantRegistry {
    /// Mutex around the SQLCipher connection to `tenants_index.db`. Single
    /// access at a time (the registry is the only thing that touches it).
    index: Mutex<TenantsIndex>,
    /// Opened tenant handles, keyed by tenant_id.
    handles: RwLock<HashMap<TenantId, Arc<TenantHandle>>>,
    /// Dev-log 0154 fix-up for 0152 M8: per-tenant open-coordination
    /// Mutex. The first caller for a given tenant takes its Mutex,
    /// runs `spawn_blocking(TenantHandle::open)`, inserts into
    /// `handles`, drops the Mutex. Subsequent same-tenant callers wait
    /// on the same Mutex; when they acquire, the second double-check
    /// finds the handle in `handles` and reuses it.
    ///
    /// Without this, a burst of N first-access calls for the same
    /// tenant could all pass both double-checks of `handles` before
    /// any of them completed `spawn_blocking`, paying N × 100-500 ms
    /// of open cost (correctness-safe, but a real perf regression vs.
    /// the pre-0152 single-flight behaviour).
    ///
    /// The outer Mutex is held only briefly to look up or insert the
    /// per-tenant Arc; the inner per-tenant Mutex is the one held
    /// across the `spawn_blocking` await. Both are `tokio::sync::Mutex`
    /// because the per-tenant lock crosses an await point.
    ///
    /// Map grows by one entry per tenant ever opened. Bounded by the
    /// real-world tenant count; no GC today. If tenant rotation
    /// becomes a deployment shape, prune entries with `Arc::strong_count == 1`.
    open_locks: Mutex<HashMap<TenantId, Arc<Mutex<()>>>>,
    deps: RegistryDeps,
}

/// Parameters for opening the registry.
pub struct TenantRegistryParams {
    pub data_dir: PathBuf,
    pub key: KeyMaterial,
    pub embedder: Arc<dyn Embedder>,
    pub hnsw_params: HnswParams,
    pub steward: Option<Arc<solo_steward::Steward>>,
    /// Tokio runtime handle. Required by the writer-actor spawn path for
    /// `block_on` of async embedder calls during reembed. The registry
    /// passes this through to every `TenantHandle::open`. Construct with
    /// `tokio::runtime::Handle::current()` from inside `#[tokio::main]`.
    pub runtime_handle: Option<TokioHandle>,
    /// v0.9.0 P0c: optional Steward factory. When `Some(_)`, every
    /// `TenantHandle::open` calls `factory.build()` and writes the
    /// result into the per-tenant `steward_slot`. Static-backend
    /// factories (`StaticStewardFactory` around an Anthropic / OpenAI /
    /// Ollama / Noop-backed Steward) populate the slot eagerly with
    /// zero hot-path lock-contention cost; the MCP-sampling factory
    /// returns `Ok(None)` for the eager-populate (v0.9.0 P2's
    /// `SoloMcpServer::initialize` hook bypasses the factory entirely
    /// on the sampling path and writes into the slot directly).
    ///
    /// When `None` (the backwards-compat path), the slot is seeded
    /// from `steward: Option<Arc<Steward>>` instead. Existing v0.8.x
    /// callers that pass `steward: Some(...)` without setting a
    /// factory observe consistent behavior — the slot mirrors the
    /// writer-actor's captured Steward.
    #[allow(clippy::type_complexity)]
    pub steward_factory:
        Option<Arc<dyn crate::steward_factory::StewardFactory>>,
    /// v0.9.0 P4-revision (P4 audit M1): optional count-based trigger
    /// signal. The daemon passes `Some(Arc::clone(&signal))` so its
    /// `triples_batch_timer` and every tenant's writer-actor share the
    /// same counter + notify. `None` for paths that don't drive the
    /// background batch (CLI one-shot commands, tests).
    pub triples_batch_signal:
        Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
}

impl TenantRegistry {
    /// Open the registry. Opens `tenants_index.db`, applies its
    /// migrations, and (if the v0.7.1 layout is detected) runs the
    /// mass-data-move helper before returning. Does NOT auto-open any
    /// tenant handle; that happens lazily on first `get_or_open`.
    pub fn open(params: TenantRegistryParams) -> Result<Self> {
        let TenantRegistryParams {
            data_dir,
            key,
            embedder,
            hnsw_params,
            steward,
            runtime_handle,
            steward_factory,
            triples_batch_signal,
        } = params;

        // Auto-detect v0.7.1 layout → run mass-data-move BEFORE opening
        // the registry. Idempotent if the helper already ran. We replicate
        // the detection logic from `startup::run` so the registry-open
        // path works whether called from daemon or one-shot.
        let tenants_index_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
        let legacy_db_path = data_dir.join("solo.db");
        if !tenants_index_path.is_file() && legacy_db_path.is_file() {
            tracing::info!(
                data_dir = %data_dir.display(),
                "v0.7.1 single-DB layout detected; running v0.7.1 → v0.8.0 mass-data-move"
            );
            crate::tenants::migrate_v071_to_v080(&data_dir, &key)?;
        }

        let index = TenantsIndex::open(&data_dir, &key)?;

        Ok(Self {
            index: Mutex::new(index),
            handles: RwLock::new(HashMap::new()),
            open_locks: Mutex::new(HashMap::new()),
            deps: RegistryDeps {
                data_dir,
                key,
                embedder,
                hnsw_params,
                steward,
                runtime_handle,
                steward_factory,
                triples_batch_signal,
            },
        })
    }

    /// Data dir this registry serves.
    pub fn data_dir(&self) -> &Path {
        &self.deps.data_dir
    }

    /// Get (or lazily open) the handle for `tenant_id`.
    ///
    /// Concurrent first-access for the same tenant: see module docs. All
    /// callers see the same `Arc<TenantHandle>`; no duplicate handle is
    /// opened.
    ///
    /// v0.9.0 P1: every call (cache miss OR cache hit) stamps
    /// `tenants.last_accessed = now_ms` in the registry before returning
    /// the handle. This closes the v0.8.0 doc-vs-code gap (lesson #39
    /// second incident — the column was documented but never written).
    /// Failures of the stamp are non-fatal: they emit a single
    /// `tracing::warn!` but do not block the open, because the
    /// last_accessed value is observational, not load-bearing for any
    /// invariant.
    pub async fn get_or_open(
        &self,
        tenant_id: &TenantId,
    ) -> Result<Arc<TenantHandle>> {
        // Fast path: read lock + HashMap::get.
        {
            let map = self.handles.read().await;
            if let Some(h) = map.get(tenant_id) {
                let handle = h.clone();
                // Release the read lock BEFORE acquiring the index
                // mutex so concurrent slow-path opens aren't blocked by
                // our stamp write.
                drop(map);
                self.touch_last_accessed_best_effort(tenant_id).await;
                return Ok(handle);
            }
        }

        // Dev-log 0152 M8 + 0154 fix-up: open serialised through a
        // per-tenant Mutex so a burst of N same-tenant callers pays
        // ONE open cost, not N. The map-level `handles` write lock is
        // never held across `spawn_blocking` so other tenants and
        // `forget_handle` can still make progress concurrently.
        //
        // Sequence:
        //   1. Look up (or insert) the per-tenant Mutex Arc under a
        //      brief `open_locks` lock; release `open_locks`.
        //   2. Await the per-tenant Mutex. Same-tenant callers serialise
        //      here; different-tenant callers do not block each other.
        //   3. Inside the per-tenant lock, double-check `handles` —
        //      if a prior caller already inserted, return their handle.
        //   4. Otherwise: lookup index, `spawn_blocking(TenantHandle::open)`,
        //      insert into `handles`. Drop per-tenant lock.
        //   5. Touch last-accessed, return.

        // 1. Get-or-insert per-tenant open lock.
        let open_lock: Arc<Mutex<()>> = {
            let mut locks = self.open_locks.lock().await;
            locks
                .entry(tenant_id.clone())
                .or_insert_with(|| Arc::new(Mutex::new(())))
                .clone()
        };

        // 2. Serialise same-tenant opens here.
        let _open_guard = open_lock.lock().await;

        // 3. Second double-check after acquiring the per-tenant lock.
        //    If a prior same-tenant caller already opened, reuse.
        {
            let map = self.handles.read().await;
            if let Some(h) = map.get(tenant_id) {
                let handle = h.clone();
                drop(map);
                drop(_open_guard);
                self.touch_last_accessed_best_effort(tenant_id).await;
                return Ok(handle);
            }
        }

        // 4. Lookup the tenant's row in the index. Drop the index
        //    mutex BEFORE the open work so a concurrent `forget_handle`
        //    can make progress. v0.8.1 P3: also capture quota_bytes so
        //    the writer-actor can enforce it.
        let (db_filename, quota_bytes) = {
            let index = self.index.lock().await;
            let rec = index.lookup(tenant_id)?.ok_or_else(|| {
                Error::not_found(format!(
                    "tenant `{tenant_id}` not found in tenants_index"
                ))
            })?;
            if rec.status != crate::tenants::TenantStatus::Active {
                return Err(Error::conflict(format!(
                    "tenant `{tenant_id}` has status `{}`; refusing to open",
                    rec.status.as_sql_str()
                )));
            }
            (rec.db_filename, rec.quota_bytes)
        };

        // Build params + open the tenant. This is sync + blocking, so run
        // it on a blocking thread to avoid stalling the tokio runtime.
        // The per-tenant `_open_guard` is held across the await — but
        // only blocks OTHER same-tenant callers, not the world.
        let tenant_id_clone = tenant_id.clone();
        let open_params = TenantOpenParams {
            data_dir: self.deps.data_dir.clone(),
            key: self.deps.key.clone(),
            db_filename,
            embedder: self.deps.embedder.clone(),
            hnsw_params: self.deps.hnsw_params.clone(),
            steward: self.deps.steward.clone(),
            runtime_handle: self.deps.runtime_handle.clone(),
            quota_bytes,
            steward_factory: self.deps.steward_factory.clone(),
            triples_batch_signal: self.deps.triples_batch_signal.clone(),
        };
        let handle = tokio::task::spawn_blocking(move || {
            TenantHandle::open(tenant_id_clone, open_params)
        })
        .await
        .map_err(|e| Error::storage(format!("join spawn_blocking for TenantHandle::open: {e}")))??;

        let arc = Arc::new(handle);
        {
            let mut map = self.handles.write().await;
            map.insert(tenant_id.clone(), arc.clone());
        }
        drop(_open_guard);

        self.touch_last_accessed_best_effort(tenant_id).await;
        Ok(arc)
    }

    /// v0.9.0 P1: helper for `get_or_open` — writes
    /// `tenants.last_accessed = now_ms` on the registry row.
    ///
    /// Best-effort: failures (registry write contention, transient
    /// SQLite error) log `tracing::warn!` but never bubble to the
    /// caller. The `last_accessed` column is observational — it powers
    /// the `solo tenants list` "last_accessed" column and informs
    /// future tenant-eviction heuristics. A missed stamp doesn't
    /// compromise any invariant.
    ///
    /// Failure path also covers the "tenant was concurrently deleted
    /// between cache hit and stamp" case: `touch_last_accessed` returns
    /// `Ok(0)` in that case, which we silently ignore — the caller has
    /// already received the cached handle.
    async fn touch_last_accessed_best_effort(&self, tenant_id: &TenantId) {
        let now_ms: i64 = chrono::Utc::now().timestamp_millis();
        let mut index = self.index.lock().await;
        if let Err(e) = index.touch_last_accessed(tenant_id, now_ms) {
            tracing::warn!(
                tenant = %tenant_id,
                error = %e,
                "touch_last_accessed: registry stamp failed; tenant open \
                 succeeded but the last_accessed column will not reflect \
                 this open until the next successful call"
            );
        }
    }

    /// Evict a tenant from the in-memory cache (used by the P6
    /// hard-delete sequence). The caller is responsible for calling
    /// `TenantHandle::shutdown` on the returned handle.
    ///
    /// Returns the evicted handle if it was in the cache, or `None` if
    /// it wasn't open.
    pub async fn forget_handle(
        &self,
        tenant_id: &TenantId,
    ) -> Option<Arc<TenantHandle>> {
        let mut map = self.handles.write().await;
        map.remove(tenant_id)
    }

    /// List every tenant in the registry index (regardless of open state).
    pub async fn list_active(&self) -> Result<Vec<TenantRecord>> {
        let index = self.index.lock().await;
        index.list()
    }

    /// v0.10.1: hydrate per-tenant cost numbers for the `/v1/tenants`
    /// wire-shape fields that v0.10.0 always reported as `null`. Returns
    /// one [`TenantCostNumbers`] per input record, in the same order.
    ///
    /// For each tenant:
    ///   * `size_bytes` — `std::fs::metadata(<data_dir>/tenants/<db_filename>).len()`,
    ///     or `None` if the file is missing / unreadable. Cheap (no DB
    ///     open).
    ///   * `episode_count` — `SELECT COUNT(*) FROM episodes WHERE
    ///     status='active'` against the per-tenant SQLCipher DB. Opens a
    ///     short-lived connection per tenant; the connection is closed
    ///     before the next tenant is touched. `None` if the DB file is
    ///     missing OR the open / count failed (caller logs).
    ///
    /// **Cap**: only the first `cap` records have `episode_count`
    /// hydrated (the expensive DB-open path). Records beyond the cap
    /// return `None` for `episode_count`. `size_bytes` is computed for
    /// every record regardless of the cap — `std::fs::metadata` is
    /// cheap (no DB open). Callers (e.g. the HTTP handler) surface the
    /// cap-reached condition via a response header so clients can
    /// fetch counts for the tail tenants out-of-band if needed. The
    /// cap is load-bearing for performance: N×10ms tenant-open is
    /// unbounded otherwise, and the `/v1/tenants` first-paint budget
    /// is tight.
    ///
    /// **Error semantics**: a single tenant's count failure is logged
    /// at WARN and surfaced as `None` for that tenant's counts. The
    /// other tenants' counts are unaffected. The method never returns
    /// `Err` — caller cannot distinguish "missing DB" from "count SQL
    /// failed" and shouldn't have to.
    ///
    /// Test-only note: if the registry was constructed via
    /// `for_tests_with_single_tenant`, the tenant DB on disk is a plain
    /// SQLite file (no SQLCipher encryption), and our SQLCipher-keyed
    /// open will fail with a header mismatch. The method falls back to
    /// a plain `Connection::open` for that case — which works on real
    /// SQLCipher files too, because PRAGMA key without a key just
    /// produces an error and `Connection::open` succeeds at the file
    /// layer. To be safe, the fallback only runs when SQLCipher open
    /// errors out, and the fallback's COUNT query itself fails on a
    /// real encrypted DB (decrypt error on schema read) — preserving
    /// production semantics.
    pub async fn hydrate_tenant_cost_numbers(
        &self,
        records: &[TenantRecord],
        cap: usize,
    ) -> Vec<TenantCostNumbers> {
        // Dev-log 0152 M7: the inner work is synchronous filesystem
        // metadata + (cap-limited) SQLCipher COUNT(*) queries. Moved
        // into `spawn_blocking` so the tokio worker doesn't stall
        // proportionally to tenant count.
        let data_dir = self.deps.data_dir.clone();
        let key = self.deps.key.clone();
        // Take a snapshot of the records the closure needs. Cloning the
        // db_filename per record is cheap relative to the metadata +
        // SQLCipher open work the closure will then do.
        let snapshot: Vec<(usize, String)> = records
            .iter()
            .enumerate()
            .map(|(i, r)| (i, r.db_filename.clone()))
            .collect();
        let total = snapshot.len();

        tokio::task::spawn_blocking(move || {
            let mut out = Vec::with_capacity(total);
            for (i, db_filename) in snapshot {
                let db_path = data_dir
                    .join(crate::tenants::TENANTS_SUBDIR)
                    .join(&db_filename);
                let size_bytes = match std::fs::metadata(&db_path) {
                    Ok(meta) => Some(meta.len()),
                    Err(_) => None,
                };
                let episode_count = if i < cap && db_path.is_file() {
                    hydrate_episode_count(&db_path, &key)
                } else {
                    None
                };
                out.push(TenantCostNumbers {
                    size_bytes,
                    episode_count,
                });
            }
            out
        })
        .await
        .unwrap_or_else(|join_err| {
            tracing::warn!(
                error = %join_err,
                "hydrate_tenant_cost_numbers: spawn_blocking join failed; returning empty"
            );
            Vec::new()
        })
    }

    /// Borrow the underlying TenantsIndex for admin operations (P6/P7
    /// create/delete/backup/restore). Avoids re-opening the SQLCipher
    /// connection for every admin call.
    pub async fn with_index<F, R>(&self, f: F) -> R
    where
        F: FnOnce(&mut TenantsIndex) -> R,
    {
        let mut index = self.index.lock().await;
        f(&mut *index)
    }

    /// Graceful shutdown of every cached handle. Saves snapshots, drains
    /// writers, drops pools. Designed for daemon exit.
    pub async fn shutdown_all(&self) {
        // Drain the map into an owned Vec so we can run shutdowns
        // sequentially without holding the write lock for the whole run.
        let handles: Vec<(TenantId, Arc<TenantHandle>)> = {
            let mut map = self.handles.write().await;
            map.drain().collect()
        };
        for (tenant_id, handle) in handles {
            // We only have an Arc<TenantHandle> in the map; shutdown
            // consumes a `TenantHandle` (not Arc). If we hold the only
            // remaining Arc, unwrap is fine; if not (e.g., a handler
            // still holds one), we fall back to logging a warning and
            // moving on — the next process restart will reopen + flush.
            match Arc::try_unwrap(handle) {
                Ok(owned) => {
                    if let Err(e) = owned.shutdown(true).await {
                        tracing::warn!(
                            tenant = %tenant_id,
                            error = %e,
                            "tenant shutdown returned error"
                        );
                    }
                }
                Err(_still_shared) => {
                    tracing::warn!(
                        tenant = %tenant_id,
                        "tenant handle still has outstanding Arc clones at \
                         shutdown_all; skipping. The next process restart \
                         will reload from snapshot + replay pending_index."
                    );
                }
            }
        }
    }

    /// True iff this tenant is currently in the in-memory cache. Test
    /// helper / metric surface.
    pub async fn is_open(&self, tenant_id: &TenantId) -> bool {
        let map = self.handles.read().await;
        map.contains_key(tenant_id)
    }

    /// Test-only constructor that builds a registry around a single
    /// pre-built `TenantHandle`. Skips opening `tenants_index.db` (which
    /// requires SQLCipher) so test harnesses can drive transport-layer
    /// flows without paying the full bootstrap cost.
    ///
    /// The supplied handle becomes the registry's only cached tenant.
    /// `get_or_open(tenant_id)` returns it for the handle's own
    /// tenant_id; any other tenant_id errors with NotFound (no index to
    /// lazy-open from).
    #[cfg(any(test, feature = "test-support"))]
    pub fn for_tests_with_single_tenant(
        data_dir: std::path::PathBuf,
        key: KeyMaterial,
        embedder: Arc<dyn solo_core::Embedder>,
        handle: Arc<TenantHandle>,
    ) -> Self {
        let tenant_id = handle.tenant_id().clone();
        let mut handles_map = HashMap::new();
        handles_map.insert(tenant_id, handle);
        // Open an in-memory TenantsIndex stub. We won't really persist
        // through it; the harness's get_or_open() lookups hit the
        // cached HashMap and never reach the index.
        // For simplicity, use an in-memory SQLite connection bypassing
        // SQLCipher entirely.
        let conn = rusqlite::Connection::open_in_memory()
            .expect("open in-memory tenants_index stub for tests");
        // Apply the registry's schema so any code that does happen to
        // touch the index (e.g. list_active) doesn't blow up.
        let mut conn = conn;
        crate::migration::run_tenants_index_migrations(&mut conn)
            .expect("apply tenants_index migrations to in-memory test stub");
        let index = TenantsIndex::from_connection_for_tests(conn);
        Self {
            index: Mutex::new(index),
            handles: RwLock::new(handles_map),
            open_locks: Mutex::new(HashMap::new()),
            deps: RegistryDeps {
                data_dir,
                key,
                embedder,
                hnsw_params: crate::vector_index::HnswParams::default(),
                steward: None,
                runtime_handle: None,
                steward_factory: None,
                triples_batch_signal: None,
            },
        }
    }
}

/// v0.10.1: per-tenant cost numbers returned by
/// [`TenantRegistry::hydrate_tenant_cost_numbers`]. One per input
/// tenant, in the same order. `None` for any field whose source could
/// not be read (missing DB file, opaque SQLite error, capped-out of
/// the per-request budget).
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TenantCostNumbers {
    /// File size of the per-tenant SQLCipher DB on disk. `None` if the
    /// file is missing or unreadable.
    pub size_bytes: Option<u64>,
    /// Count of `episodes WHERE status='active'`. `None` if the DB
    /// could not be opened or queried, or if the per-request cap was
    /// reached before this tenant.
    pub episode_count: Option<i64>,
}

/// Open the per-tenant SQLCipher DB at `db_path`, count active
/// episodes, and return the result. Returns `None` on any open / count
/// failure (logged at WARN). Sync — caller may run from blocking
/// context (the `/v1/tenants` hydration loop does so).
///
/// Performance: one SQLCipher open + one indexed COUNT per call.
/// Typical wall is ~5-10ms per tenant on a warm filesystem. The cap on
/// the calling side bounds the total per-request cost.
///
/// Test-only fallback: if the SQLCipher open fails (e.g. the test
/// harness's plain-SQLite tenant DB), retry with a plain
/// `rusqlite::Connection::open`. On a real encrypted DB the fallback's
/// COUNT will itself fail because the schema page can't be decoded
/// without the key — so production semantics are preserved.
fn hydrate_episode_count(
    db_path: &std::path::Path,
    key: &KeyMaterial,
) -> Option<i64> {
    let count = match crate::init::open_sqlcipher(db_path, key) {
        Ok(conn) => count_active_episodes(&conn),
        Err(_) => {
            // Test-mode fallback: harness DBs are plain SQLite, not
            // SQLCipher. Try a plain open.
            match rusqlite::Connection::open(db_path) {
                Ok(conn) => count_active_episodes(&conn),
                Err(e) => {
                    tracing::warn!(
                        db = %db_path.display(),
                        error = %e,
                        "hydrate_episode_count: open failed (both SQLCipher and plain)"
                    );
                    return None;
                }
            }
        }
    };
    match count {
        Ok(n) => Some(n),
        Err(e) => {
            tracing::warn!(
                db = %db_path.display(),
                error = %e,
                "hydrate_episode_count: COUNT(*) failed"
            );
            None
        }
    }
}

fn count_active_episodes(conn: &rusqlite::Connection) -> Result<i64> {
    conn.query_row(
        "SELECT COUNT(*) FROM episodes WHERE status = 'active'",
        [],
        |r| r.get::<_, i64>(0),
    )
    .map_err(|e| Error::storage(format!("COUNT(*) FROM episodes: {e}")))
}