Skip to main content

solo_storage/
startup.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Daemon startup orchestration. Per ADR-0003 §O6 ("Startup ordering: linear
4//! await chain in main()") and §"Startup file-existence decision tree".
5//!
6//! [`StartupChain::run`] is the canonical sequence used by `solo daemon`:
7//!
8//!   1. Read `solo.config.toml` (salt + embedder identity).
9//!   2. Open the SQLCipher database with the supplied key (init connection).
10//!   3. Run pending migrations (idempotent for already-migrated DBs).
11//!   4. Load the HNSW snapshot from disk:
12//!         a. Try `snapshot::load(dir)` (the live `_episodes.hnsw.{data,graph}`
13//!            pair).
14//!         b. On failure, try `snapshot::load_bak(dir)`.
15//!         c. On both failures, fall through to a fresh empty index. (Rebuild
16//!            from SQL is post-v0.1; the daemon logs a WARN so ops can act.)
17//!   5. Validate dim consistency: HNSW snapshot dim, if non-zero, must equal
18//!      `solo.config.toml.embedder.dim`. Refuse to start on mismatch — the
19//!      embedder identity has shifted under the daemon and user data would
20//!      be quietly miscompared.
21//!   6. Replay `pending_index` rows into the HNSW.
22//!   7. Detect drift (`hot episodes` vs `index.len()`); WARN on non-zero diff.
23//!   8. Close the init connection. The WriterActor (spawned by the caller)
24//!      opens its own long-lived connection on its dedicated thread per
25//!      ADR-0003 §"Migration vs. writer-thread connection lifecycle".
26//!
27//! Returns a [`StartupOutcome`] with everything the daemon main needs to
28//! spawn the writer actor + reader pool. The caller is responsible for the
29//! lockfile (must outlive this call) and for spawning the writer thread.
30//!
31//! ## What this module does NOT do
32//!
33//! - Lockfile acquisition (caller's job; lock must outlive the daemon).
34//! - Passphrase prompting (CLI's job).
35//! - Spawning the writer thread (caller chooses the snapshot_dir + capacity).
36//! - Building the read pool (caller chooses pool size).
37//! - Constructing the embedder (caller wires Stub or BGE-M3 per config).
38//! - Signal handling, panic hook, snapshot timer (commit 1.5 daemon main).
39
40use std::path::{Path, PathBuf};
41use std::sync::Arc;
42
43use rusqlite::Connection;
44use solo_core::{Result, VectorIndex, VectorIndexFactory};
45
46use crate::config::SoloConfig;
47use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
48use crate::hnsw_rebuild::{
49    rebuild_chunk_tombstones_from_sql, rebuild_episode_tombstones_from_sql,
50};
51use crate::init::open_sqlcipher;
52use crate::key_material::KeyMaterial;
53use crate::migration;
54use crate::recovery::{
55    DriftReport, RebuildReport, ReplayReport, detect_drift, rebuild_hnsw_from_sql,
56    replay_pending_index,
57};
58use crate::snapshot;
59use crate::tenants::{
60    TENANTS_INDEX_FILENAME, TENANTS_SUBDIR, migrate_v071_to_v080,
61};
62use crate::vector_index::{HnswFactory, HnswIndex, HnswParams};
63
64/// What the startup chain hands back to the daemon main. Mostly opaque
65/// because the daemon doesn't need to inspect any of this — it just feeds
66/// the writer actor + reader pool.
67pub struct StartupOutcome {
68    pub data_dir: PathBuf,
69    pub db_path: PathBuf,
70    pub config: SoloConfig,
71    /// Schema version after migrations. May equal pre-startup version if
72    /// nothing was pending.
73    pub schema_version: u32,
74    /// The HNSW. Wrapped in `Arc<dyn VectorIndex>` so it can be shared with
75    /// the read pool per ADR-0003 §O2.
76    pub hnsw: Arc<dyn VectorIndex + Send + Sync>,
77    /// Resolved `embedders.embedder_id` for the persisted config's
78    /// embedder identity. Lazy-inserted on first daemon start; cached
79    /// in the WriterActor for every subsequent `INSERT INTO embeddings`.
80    pub embedder_id: i64,
81    /// Pending-index replay statistics.
82    pub replay: ReplayReport,
83    /// Drift between SQL `episodes WHERE tier='hot'` and HNSW length, after
84    /// replay. Daemon decides what to do (warn / refuse / rebuild) — startup
85    /// itself only reports.
86    pub drift: DriftReport,
87    /// True if we fell back to the `.bak` snapshot.
88    pub used_bak_snapshot: bool,
89    /// True if no usable snapshot was found and we started with a fresh index.
90    /// (Independent of `rebuild` below — `started_fresh && rebuild.rows_added > 0`
91    /// means the snapshot was missing but the index was rebuilt from the
92    /// `embeddings` table.)
93    pub started_fresh: bool,
94    /// Stats from `rebuild_hnsw_from_sql` after a missing-snapshot
95    /// fallback. All zeros in the steady state (snapshot loaded fine) and
96    /// after `solo init` (no rows yet). Non-zero only after `solo reembed`
97    /// has wiped the snapshots. `rows_skipped > 0` means the rebuild
98    /// degraded around corrupt rows — recall coverage will be incomplete
99    /// for those rowids until the user investigates and re-runs reembed.
100    pub rebuild: RebuildReport,
101}
102
103impl std::fmt::Debug for StartupOutcome {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        // Arc<dyn VectorIndex> is not Debug; surface only summary-shaped fields.
106        f.debug_struct("StartupOutcome")
107            .field("data_dir", &self.data_dir)
108            .field("schema_version", &self.schema_version)
109            .field("hnsw_len", &self.hnsw.len())
110            .field("hnsw_dim", &self.hnsw.dim())
111            .field("embedder_id", &self.embedder_id)
112            .field("replay", &self.replay)
113            .field("drift", &self.drift)
114            .field("used_bak_snapshot", &self.used_bak_snapshot)
115            .field("started_fresh", &self.started_fresh)
116            .field("rebuild", &self.rebuild)
117            .finish()
118    }
119}
120
121/// Tunable parameters for the startup chain. Everything has a sensible
122/// default; the daemon main can override per environment.
123#[derive(Debug, Clone)]
124pub struct StartupParams {
125    pub data_dir: PathBuf,
126    pub key: KeyMaterial,
127    pub hnsw_params: HnswParams,
128}
129
130impl StartupParams {
131    pub fn new(data_dir: impl Into<PathBuf>, key: KeyMaterial) -> Self {
132        Self {
133            data_dir: data_dir.into(),
134            key,
135            hnsw_params: HnswParams::default(),
136        }
137    }
138
139    pub fn with_hnsw_params(mut self, params: HnswParams) -> Self {
140        self.hnsw_params = params;
141        self
142    }
143}
144
145/// Run the startup sequence. Linear await chain; each step gates the next.
146pub fn run(params: StartupParams) -> Result<StartupOutcome> {
147    let StartupParams {
148        data_dir,
149        key,
150        hnsw_params,
151    } = params;
152
153    // Step 1 — read solo.config.toml.
154    let config_path = data_dir.join("solo.config.toml");
155    let config = SoloConfig::read(&config_path)?;
156    let dim = config.embedder.dim as usize;
157    if dim == 0 {
158        return Err(solo_core::Error::storage(format!(
159            "solo.config.toml records embedder.dim=0 — corrupt config? at {config_path:?}"
160        )));
161    }
162
163    // Step 2 — resolve the per-tenant DB path (v0.8.0 layout).
164    //
165    // Three cases:
166    //   * v0.8.0 layout in place (tenants_index.db present, default.db
167    //     under tenants/) — open it directly.
168    //   * v0.7.1 layout in place (solo.db at root, no tenants_index.db)
169    //     — run the mass-data-move helper to upgrade, then open the
170    //     migrated default.db. P2 will pull this trigger out of the
171    //     daemon boot chain into a dedicated entry point; in P1 the
172    //     check lives here so v0.7.1 users on a one-shot or daemon
173    //     restart get the upgrade transparently.
174    //   * Neither — not initialized; surface a helpful error.
175    let tenants_index_path = data_dir.join(TENANTS_INDEX_FILENAME);
176    let tenants_default_db = data_dir.join(TENANTS_SUBDIR).join("default.db");
177    let legacy_db_path = data_dir.join("solo.db");
178    let db_path: PathBuf = if tenants_index_path.is_file() && tenants_default_db.is_file() {
179        tenants_default_db
180    } else if legacy_db_path.is_file() && !tenants_index_path.is_file() {
181        // v0.7.1 layout detected — upgrade in place.
182        tracing::info!(
183            data_dir = %data_dir.display(),
184            "v0.7.1 single-DB layout detected; running v0.7.1 → v0.8.0 mass-data-move"
185        );
186        migrate_v071_to_v080(&data_dir, &key)?;
187        let migrated = data_dir.join(TENANTS_SUBDIR).join("default.db");
188        if !migrated.is_file() {
189            return Err(solo_core::Error::storage(format!(
190                "v0.7.1 → v0.8.0 migration completed without errors but \
191                 the migrated default.db is not present at {}",
192                migrated.display()
193            )));
194        }
195        migrated
196    } else {
197        return Err(solo_core::Error::not_found(format!(
198            "Solo database not found in {}; run `solo init` first",
199            data_dir.display()
200        )));
201    };
202    let mut conn: Connection = open_sqlcipher(&db_path, &key)?;
203
204    // Step 3 — run migrations idempotently.
205    let schema_version = migration::run_migrations(&mut conn)?;
206
207    // Step 3b — resolve embedder_id from the persisted config. Lazy-
208    // inserts the row in `embedders` on first daemon start. Verifies
209    // dim/dtype consistency against any prior row → Conflict if the
210    // user changed embedder dim under the same name+version.
211    let embedder_identity = EmbedderIdentity {
212        name: config.embedder.name.clone(),
213        version: config.embedder.version.clone(),
214        dim: config.embedder.dim,
215        dtype: config.embedder.dtype.clone(),
216    };
217    let embedder_id = get_or_insert_embedder_id(&conn, &embedder_identity)?;
218
219    // Step 4 — load HNSW snapshot.
220    //
221    // v0.8.0 layout: snapshots live in `<data_dir>/tenants/` alongside
222    // the per-tenant DB. The migrate helper moves them there as part of
223    // the v0.7.1 upgrade. We pass that subdir to `load_hnsw_with_fallback`
224    // for the default-tenant single-tenant case in P1; P2 introduces
225    // per-tenant snapshot subdirs for multi-tenant deployments.
226    let snapshot_dir = data_dir.join(TENANTS_SUBDIR);
227    let factory = HnswFactory::with_params(hnsw_params);
228    let (hnsw_index, used_bak_snapshot, started_fresh) =
229        load_hnsw_with_fallback(&snapshot_dir, &factory, dim);
230
231    // Step 5 — dim consistency check (only if we loaded a non-empty snapshot).
232    if !started_fresh && hnsw_index.dim() != dim {
233        return Err(solo_core::Error::storage(format!(
234            "HNSW snapshot dim ({}) does not match solo.config.toml embedder.dim ({}). \
235             Embedder identity has shifted under the daemon. Run `solo reembed` to rebuild.",
236            hnsw_index.dim(),
237            dim
238        )));
239    }
240
241    // Step 5b — rebuild from SQL when no snapshot was loadable. The
242    // `solo reembed` flow wipes the snapshot pairs deliberately so that
243    // this branch picks up the freshly-written embeddings rows; without
244    // it, recall would return zero hits until the user remembers enough
245    // new content to repopulate the graph.
246    //
247    // Done BEFORE wrapping `hnsw_index` in Arc (cheaper to take a `&dyn
248    // VectorIndex` from the owned value) and BEFORE `replay_pending_index`
249    // / `rebuild_tombstones_from_sql` so those steps see the rebuilt
250    // index in its expected state.
251    let rebuild = if started_fresh {
252        let started = std::time::Instant::now();
253        let r = rebuild_hnsw_from_sql(&conn, &hnsw_index, embedder_id)?;
254        if r.rows_seen > 0 {
255            tracing::info!(
256                rows_seen = r.rows_seen,
257                rows_added = r.rows_added,
258                rows_skipped = r.rows_skipped,
259                elapsed_ms = started.elapsed().as_millis() as u64,
260                "rebuilt HNSW from `embeddings` after empty-snapshot fallback"
261            );
262        }
263        r
264    } else {
265        RebuildReport::default()
266    };
267
268    let hnsw: Arc<dyn VectorIndex + Send + Sync> = Arc::new(hnsw_index);
269
270    // Step 6 — rebuild tombstones from SQL (only when we LOADED a
271    // snapshot).
272    //
273    // `HnswIndex::tombstones` is in-memory only — a snapshot reload comes
274    // back with an empty set, so previously-forgotten vectors would
275    // technically reappear in the HNSW graph. The SQL `status='active'`
276    // filter on the recall path masks this from users, but it would
277    // produce spurious drift-detected warnings (`index.len()` would
278    // count forgotten vectors as live) and would mean re-add of a
279    // previously-forgotten id wouldn't lift a stale tombstone.
280    //
281    // When we REBUILT from SQL (Step 5b), this step is unnecessary AND
282    // counterproductive: rebuild already excludes `status='forgotten'`
283    // rows, so the forgotten rowids aren't in the graph. Adding them to
284    // the tombstone set anyway would skew `len()` (which reports
285    // `raw_len - tombstones.len()`) and produce false-positive drift
286    // warnings. So we only run the tombstone rebuild on the
287    // snapshot-loaded path.
288    let (forgotten, forgotten_chunks) = if started_fresh {
289        (0, 0)
290    } else {
291        // Dev-log 0154: both passes live in the shared `hnsw_rebuild`
292        // module so the per-tenant copies in `tenants/handle.rs` can't
293        // drift from this default-data-dir one.
294        let eps = rebuild_episode_tombstones_from_sql(&conn, hnsw.as_ref())?;
295        let chunks = rebuild_chunk_tombstones_from_sql(&conn, hnsw.as_ref())?;
296        (eps, chunks)
297    };
298    if forgotten > 0 {
299        tracing::info!(forgotten, "rebuilt HNSW tombstones from episodes.status='forgotten'");
300    }
301    if forgotten_chunks > 0 {
302        tracing::info!(
303            forgotten_chunks,
304            "rebuilt HNSW tombstones from document_chunks of forgotten documents"
305        );
306    }
307
308    // Step 7 — replay pending_index.
309    let replay = replay_pending_index(&mut conn, hnsw.as_ref())?;
310
311    // Step 8 — drift detection (advisory).
312    let drift = detect_drift(&conn, hnsw.as_ref())?;
313
314    // Step 9 — close init connection. The writer actor will open its own.
315    drop(conn);
316
317    Ok(StartupOutcome {
318        data_dir,
319        db_path,
320        config,
321        schema_version,
322        hnsw,
323        embedder_id,
324        replay,
325        drift,
326        used_bak_snapshot,
327        started_fresh,
328        rebuild,
329    })
330}
331
332// Dev-log 0154: the previous local copies of
333// `rebuild_tombstones_from_sql` + `rebuild_chunk_tombstones_from_sql`
334// were lifted into `crate::hnsw_rebuild` so the per-tenant copies in
335// `tenants/handle.rs` use the same implementation. See that module for
336// the rationale + commentary.
337
338/// Try the live snapshot, then `.bak`, then fall back to a fresh empty
339/// index of the configured dim. Logging communicates which path was taken
340/// so ops can investigate `.bak` falls back without surprise.
341fn load_hnsw_with_fallback(
342    data_dir: &Path,
343    factory: &HnswFactory,
344    dim: usize,
345) -> (HnswIndex, bool, bool) {
346    match snapshot::load(data_dir) {
347        Ok(idx) => {
348            tracing::info!(
349                snapshot_kind = "live",
350                dim = idx.dim(),
351                len = idx.len(),
352                "HNSW loaded from live snapshot"
353            );
354            (idx, false, false)
355        }
356        Err(primary_err) => {
357            tracing::warn!(error = %primary_err, "live HNSW snapshot failed; trying .bak");
358            match snapshot::load_bak(data_dir) {
359                Ok(idx) => {
360                    tracing::warn!(
361                        snapshot_kind = "bak",
362                        dim = idx.dim(),
363                        len = idx.len(),
364                        "HNSW loaded from backup snapshot — investigate the live pair"
365                    );
366                    (idx, true, false)
367                }
368                Err(bak_err) => {
369                    tracing::warn!(
370                        primary = %primary_err,
371                        bak = %bak_err,
372                        dim,
373                        "no HNSW snapshot available; starting fresh empty index. \
374                         The startup chain will attempt rebuild_hnsw_from_sql next; \
375                         if the `embeddings` table is also empty, recall will return \
376                         no hits until new content is remembered."
377                    );
378                    let empty = factory
379                        .create(dim)
380                        .expect("HnswFactory::create with valid dim must succeed");
381                    (empty, false, true)
382                }
383            }
384        }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use crate::config::EmbedderConfig;
392    use crate::init::{InitParams, init};
393    use crate::key_material::KeyMaterial;
394    use rusqlite::params;
395    use solo_core::{Confidence, EncodingContext, Episode, MemoryId, Tier};
396
397    fn fresh_init_dir() -> (tempfile::TempDir, KeyMaterial) {
398        let tmp = tempfile::TempDir::new().unwrap();
399        // Use init() to lay down a real db + config for startup to pick up.
400        // init() generates its own salt; we re-derive the key from the
401        // persisted salt afterwards.
402        let _ = init(InitParams {
403            data_dir: tmp.path().to_path_buf(),
404            passphrase: zeroize::Zeroizing::new("password-123".into()),
405            force: false,
406            embedder: EmbedderConfig {
407                name: "stub".into(),
408                version: "v1".into(),
409                dim: 32,
410                dtype: "f32".into(),
411            },
412        })
413        .unwrap();
414        // Re-derive key with the salt that init() persisted.
415        let cfg = SoloConfig::read(&tmp.path().join("solo.config.toml")).unwrap();
416        let key = KeyMaterial::derive("password-123", &cfg.salt_bytes().unwrap()).unwrap();
417        (tmp, key)
418    }
419
420    /// v0.8.0 layout: snapshots live under `<data_dir>/tenants/`. Tests
421    /// that plant snapshots / open the per-tenant DB directly use these
422    /// helpers so the path-resolution logic is in one place.
423    fn snapshot_dir(data_dir: &Path) -> PathBuf {
424        data_dir.join(crate::tenants::TENANTS_SUBDIR)
425    }
426    fn per_tenant_db(data_dir: &Path) -> PathBuf {
427        data_dir
428            .join(crate::tenants::TENANTS_SUBDIR)
429            .join("default.db")
430    }
431
432    fn enqueue_pending(conn: &Connection, memory_id: &str, dim: usize) {
433        let zeros = vec![0u8; dim * 4];
434        conn.execute(
435            "INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
436             VALUES (?, ?, ?, ?)",
437            params![memory_id, &zeros[..], dim as i64, 0i64],
438        )
439        .unwrap();
440    }
441
442    fn insert_hot_episode(conn: &Connection, content: &str) -> String {
443        let mid = MemoryId::new();
444        let ep = Episode {
445            memory_id: mid,
446            ts_ms: chrono::Utc::now().timestamp_millis(),
447            source_type: "user_message".into(),
448            source_id: None,
449            content: content.into(),
450            encoding_context: EncodingContext::default(),
451            provenance: None,
452            confidence: Confidence::new(0.9).unwrap(),
453            strength: 0.5,
454            salience: 0.5,
455            tier: Tier::Hot,
456        };
457        let now_ms = chrono::Utc::now().timestamp_millis();
458        let tier = match ep.tier {
459            Tier::Hot => "hot",
460            Tier::Warm => "warm",
461            Tier::Cold => "cold",
462        };
463        conn.execute(
464            "INSERT INTO episodes (
465                memory_id, ts_ms, source_type, source_id, content,
466                encoding_context_json, provenance_json, confidence,
467                strength, salience, tier, created_at_ms, updated_at_ms
468             ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
469            params![
470                ep.memory_id.to_string(),
471                ep.ts_ms,
472                ep.source_type,
473                ep.source_id,
474                ep.content,
475                "{}",
476                Option::<String>::None,
477                ep.confidence.0,
478                ep.strength,
479                ep.salience,
480                tier,
481                now_ms,
482                now_ms,
483            ],
484        )
485        .unwrap();
486        mid.to_string()
487    }
488
489    #[test]
490    fn run_starts_fresh_when_no_snapshot_exists() {
491        let (tmp, key) = fresh_init_dir();
492        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
493        assert!(outcome.started_fresh);
494        assert!(!outcome.used_bak_snapshot);
495        assert_eq!(outcome.hnsw.len(), 0);
496        assert_eq!(outcome.hnsw.dim(), 32);
497        assert_eq!(outcome.replay.rows_seen, 0);
498        assert!(outcome.drift.is_clean());
499    }
500
501    #[test]
502    fn run_replays_pending_index_into_fresh_hnsw() {
503        let (tmp, key) = fresh_init_dir();
504        // Pre-populate: insert an episode + queue it in pending_index.
505        let cfg = SoloConfig::read(&tmp.path().join("solo.config.toml")).unwrap();
506        let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
507        let mid = insert_hot_episode(&conn, "hello startup");
508        enqueue_pending(&conn, &mid, cfg.embedder.dim as usize);
509        drop(conn);
510
511        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
512        assert!(outcome.started_fresh);
513        assert_eq!(outcome.replay.rows_seen, 1);
514        assert_eq!(outcome.replay.rows_replayed, 1);
515        assert_eq!(outcome.hnsw.len(), 1);
516        assert!(outcome.drift.is_clean(), "drift: {:?}", outcome.drift);
517    }
518
519    #[test]
520    fn run_loads_persisted_snapshot_when_present() {
521        let (tmp, key) = fresh_init_dir();
522        let dim = 32usize;
523        // Build + save a snapshot manually to simulate a daemon shutdown/restart.
524        {
525            use solo_core::VectorIndex;
526            let factory = HnswFactory::default();
527            let idx = factory.create(dim).unwrap();
528            for i in 1..=5 {
529                let v = vec![0.1f32 * i as f32; dim];
530                idx.add(i as i64, &v).unwrap();
531            }
532            snapshot::save(&idx, &snapshot_dir(tmp.path())).unwrap();
533        }
534
535        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
536        assert!(!outcome.started_fresh);
537        assert!(!outcome.used_bak_snapshot);
538        assert_eq!(outcome.hnsw.len(), 5);
539        assert_eq!(outcome.hnsw.dim(), dim);
540    }
541
542    #[test]
543    fn run_falls_back_to_bak_when_live_corrupt() {
544        let (tmp, key) = fresh_init_dir();
545        let dim = 32usize;
546        {
547            use solo_core::VectorIndex;
548            let factory = HnswFactory::default();
549            // Save 1 → live exists, no bak.
550            let idx1 = factory.create(dim).unwrap();
551            for i in 1..=3 {
552                idx1.add(i, &vec![0.0f32; dim]).unwrap();
553            }
554            snapshot::save(&idx1, &snapshot_dir(tmp.path())).unwrap();
555            // Save 2 → live = idx2 (5 elements), bak = idx1 (3 elements).
556            let idx2 = factory.create(dim).unwrap();
557            for i in 1..=5 {
558                idx2.add(i, &vec![0.0f32; dim]).unwrap();
559            }
560            snapshot::save(&idx2, &snapshot_dir(tmp.path())).unwrap();
561        }
562        // Corrupt the live graph file (v0.8.0 layout: under tenants/).
563        std::fs::write(
564            snapshot_dir(tmp.path()).join("hnsw_episodes.hnsw.graph"),
565            b"GARBAGE",
566        )
567        .unwrap();
568
569        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
570        assert!(!outcome.started_fresh);
571        assert!(outcome.used_bak_snapshot);
572        assert_eq!(outcome.hnsw.len(), 3); // bak's value
573    }
574
575    #[test]
576    fn run_refuses_when_db_missing() {
577        let tmp = tempfile::TempDir::new().unwrap();
578        // Write only a config; no db file.
579        let cfg = SoloConfig::new(
580            [0u8; crate::key_material::SALT_LEN],
581            EmbedderConfig {
582                name: "stub".into(),
583                version: "v1".into(),
584                dim: 32,
585                dtype: "f32".into(),
586            },
587        );
588        cfg.write(&tmp.path().join("solo.config.toml")).unwrap();
589        let key =
590            KeyMaterial::derive("password-123", &cfg.salt_bytes().unwrap()).unwrap();
591        let err = run(StartupParams::new(tmp.path(), key)).unwrap_err();
592        assert!(
593            err.to_string().contains("not found"),
594            "got: {err}"
595        );
596    }
597
598    #[test]
599    fn run_refuses_when_dim_mismatches_snapshot() {
600        let (tmp, key) = fresh_init_dir();
601        // Save a snapshot with the WRONG dim (config says 32, snapshot says 8).
602        {
603            use solo_core::VectorIndex;
604            let factory = HnswFactory::default();
605            let idx = factory.create(8).unwrap();
606            idx.add(1, &vec![0.0f32; 8]).unwrap();
607            snapshot::save(&idx, &snapshot_dir(tmp.path())).unwrap();
608        }
609        let err = run(StartupParams::new(tmp.path(), key)).unwrap_err();
610        assert!(
611            err.to_string().contains("does not match"),
612            "got: {err}"
613        );
614    }
615
616    /// Helper: seed `episodes` + `embeddings` rows under the persisted
617    /// config's embedder identity. Used by the rebuild-from-SQL tests
618    /// to populate the embeddings table without needing a real writer
619    /// actor / runtime. Returns `(memory_id, rowid)` per content.
620    fn seed_embeddings_for_current_embedder(
621        tmp_path: &Path,
622        key: &KeyMaterial,
623        contents: &[&str],
624    ) -> Vec<(String, i64)> {
625        let cfg_path = tmp_path.join("solo.config.toml");
626        let cfg = SoloConfig::read(&cfg_path).unwrap();
627        let conn = open_sqlcipher(&per_tenant_db(tmp_path), key).unwrap();
628        let identity = EmbedderIdentity {
629            name: cfg.embedder.name.clone(),
630            version: cfg.embedder.version.clone(),
631            dim: cfg.embedder.dim,
632            dtype: cfg.embedder.dtype.clone(),
633        };
634        let embedder_id = get_or_insert_embedder_id(&conn, &identity).unwrap();
635        let dim = cfg.embedder.dim as usize;
636        let now_ms = chrono::Utc::now().timestamp_millis();
637
638        let mut out = Vec::new();
639        for content in contents {
640            let mid = insert_hot_episode(&conn, content);
641            let rowid: i64 = conn
642                .query_row(
643                    "SELECT rowid FROM episodes WHERE memory_id = ?",
644                    params![mid],
645                    |r| r.get(0),
646                )
647                .unwrap();
648            // Distinct vector per row so search ordering is well-defined.
649            let mut bytes = vec![0u8; dim * 4];
650            // Stamp the rowid into the first 8 bytes so vectors aren't all-zero.
651            bytes[..8].copy_from_slice(&rowid.to_le_bytes());
652            conn.execute(
653                "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
654                 VALUES (?, ?, ?, ?, ?, ?)",
655                params![mid, embedder_id, "f32", dim as i64, &bytes[..], now_ms],
656            )
657            .unwrap();
658            out.push((mid, rowid));
659        }
660        drop(conn);
661        out
662    }
663
664    /// Rebuild-from-SQL: when both snapshot pairs are missing, the
665    /// startup chain populates the HNSW from the `embeddings` table for
666    /// the active embedder. End state: `outcome.hnsw.len()` matches the
667    /// number of active rows; drift is clean.
668    #[test]
669    fn run_rebuilds_hnsw_from_sql_when_no_snapshot() {
670        let (tmp, key) = fresh_init_dir();
671        seed_embeddings_for_current_embedder(tmp.path(), &key, &["a", "b", "c"]);
672
673        // No snapshots exist (init doesn't write any).
674        assert!(!snapshot::pair_exists(&snapshot_dir(tmp.path()), snapshot::LIVE_BASENAME));
675
676        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
677
678        assert!(outcome.started_fresh, "no snapshot → started_fresh");
679        assert_eq!(outcome.rebuild.rows_seen, 3);
680        assert_eq!(outcome.rebuild.rows_added, 3, "all 3 active rows rebuilt");
681        assert_eq!(outcome.rebuild.rows_skipped, 0);
682        assert_eq!(outcome.hnsw.len(), 3);
683        assert!(outcome.drift.is_clean(), "drift: {:?}", outcome.drift);
684    }
685
686    /// Rebuild excludes `episodes WHERE status = 'forgotten'`. Tombstone
687    /// rebuild then adds the forgotten rowid to the in-memory tombstone
688    /// set (no-op for our HNSW since we never added it, but the
689    /// post-rebuild tombstone count matches SQL).
690    #[test]
691    fn run_rebuild_excludes_forgotten_episodes() {
692        let (tmp, key) = fresh_init_dir();
693        let seeded =
694            seed_embeddings_for_current_embedder(tmp.path(), &key, &["keep1", "drop", "keep2"]);
695
696        let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
697        conn.execute(
698            "UPDATE episodes SET status = 'forgotten' WHERE memory_id = ?",
699            params![seeded[1].0],
700        )
701        .unwrap();
702        drop(conn);
703
704        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
705        assert_eq!(outcome.rebuild.rows_added, 2, "forgotten row skipped");
706        assert_eq!(outcome.rebuild.rows_skipped, 0);
707        assert_eq!(outcome.hnsw.len(), 2);
708    }
709
710    /// Corrupt-row resilience: a single bad embedding row (size mismatch)
711    /// must NOT abort the rebuild. The healthy rows still land in the
712    /// graph; the bad one is logged and counted in `rows_skipped`. Lets
713    /// `solo doctor` and `solo reembed` keep running so the user can
714    /// investigate from inside the product.
715    #[test]
716    fn run_rebuild_skips_corrupt_rows_and_continues() {
717        let (tmp, key) = fresh_init_dir();
718        let _seeded =
719            seed_embeddings_for_current_embedder(tmp.path(), &key, &["good1", "bad", "good2"]);
720
721        // Corrupt the middle memory's embedding: write a 4-byte blob
722        // where dim*4 = 32*4 = 128 bytes are expected.
723        let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
724        conn.execute(
725            "UPDATE embeddings SET vector = ?, dim = ?
726             WHERE memory_id = ?",
727            params![&vec![0u8; 4][..], 32i64, _seeded[1].0],
728        )
729        .unwrap();
730        drop(conn);
731
732        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
733        assert_eq!(outcome.rebuild.rows_seen, 3);
734        assert_eq!(outcome.rebuild.rows_added, 2, "two healthy rows added");
735        assert_eq!(outcome.rebuild.rows_skipped, 1, "corrupt row skipped");
736        assert_eq!(outcome.hnsw.len(), 2);
737    }
738
739    /// Rebuild only walks rows where `embeddings.embedder_id` matches
740    /// the active embedder. Stale rows under a previously-registered
741    /// embedder (e.g. stub before BGE-M3 was wired in but the user
742    /// hasn't run `solo reembed` yet) do NOT get added.
743    #[test]
744    fn run_rebuild_skips_rows_for_non_current_embedder() {
745        let (tmp, key) = fresh_init_dir();
746        // 1 row under the current embedder.
747        seed_embeddings_for_current_embedder(tmp.path(), &key, &["ours"]);
748
749        // Register a *second* embedder and write a stray embedding row
750        // for a separate episode under it. Rebuild should ignore that.
751        let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
752        let other_id = get_or_insert_embedder_id(
753            &conn,
754            &EmbedderIdentity {
755                name: "other".into(),
756                version: "v1".into(),
757                dim: 32,
758                dtype: "f32".into(),
759            },
760        )
761        .unwrap();
762        let stray_mid = insert_hot_episode(&conn, "stray");
763        let zeros = vec![0u8; 32 * 4];
764        let now = chrono::Utc::now().timestamp_millis();
765        conn.execute(
766            "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
767             VALUES (?, ?, ?, ?, ?, ?)",
768            params![stray_mid, other_id, "f32", 32i64, &zeros[..], now],
769        )
770        .unwrap();
771        drop(conn);
772
773        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
774        assert_eq!(
775            outcome.rebuild.rows_added, 1,
776            "only the row under the current embedder is rebuilt"
777        );
778        assert_eq!(outcome.rebuild.rows_skipped, 0);
779        assert_eq!(outcome.hnsw.len(), 1);
780    }
781
782    /// Regression test for the post-reload tombstone bug: snapshot a
783    /// non-empty HNSW, mark some episodes as `status='forgotten'` in SQL,
784    /// then re-run startup. The `rebuild_tombstones_from_sql` step must
785    /// re-insert those rowids into HnswIndex's tombstone set so drift
786    /// detection stays clean and forgotten ids don't accidentally surface.
787    #[test]
788    fn run_rebuilds_tombstones_from_forgotten_episodes() {
789        use solo_core::VectorIndex;
790        let (tmp, key) = fresh_init_dir();
791        let dim = 32usize;
792
793        // Lay down a snapshot containing 3 vectors at rowids 1, 2, 3.
794        {
795            let factory = HnswFactory::default();
796            let idx = factory.create(dim).unwrap();
797            for i in 1..=3 {
798                idx.add(i as i64, &vec![0.1f32; dim]).unwrap();
799            }
800            snapshot::save(&idx, &snapshot_dir(tmp.path())).unwrap();
801        }
802
803        // Insert 3 episodes (so SQL rowids 1, 2, 3 align with the HNSW
804        // entries). Mark rowid=2 as forgotten.
805        let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
806        let _ = insert_hot_episode(&conn, "first");
807        let mid2 = insert_hot_episode(&conn, "second");
808        let _ = insert_hot_episode(&conn, "third");
809        conn.execute(
810            "UPDATE episodes SET status='forgotten' WHERE memory_id = ?",
811            params![mid2],
812        )
813        .unwrap();
814        drop(conn);
815
816        let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
817        // 3 vectors total - 1 tombstoned = 2 visible.
818        assert_eq!(outcome.hnsw.len(), 2);
819        // Drift: hot+active in SQL is 2, hnsw.len() is 2 → clean.
820        assert!(
821            outcome.drift.is_clean(),
822            "expected clean drift after tombstone rebuild, got: {:?}",
823            outcome.drift
824        );
825        // Search for vector 2 must exclude rowid 2.
826        let hits = outcome.hnsw.search(&vec![0.1f32; dim], 5).unwrap();
827        assert!(
828            !hits.iter().any(|(r, _)| *r == 2),
829            "rowid 2 should be tombstoned: hits={hits:?}"
830        );
831    }
832}