1use std::path::{Path, PathBuf};
41use std::sync::Arc;
42
43use rusqlite::Connection;
44use solo_core::{Result, VectorIndex, VectorIndexFactory};
45
46use crate::config::SoloConfig;
47use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
48use crate::hnsw_rebuild::{
49 rebuild_chunk_tombstones_from_sql, rebuild_episode_tombstones_from_sql,
50};
51use crate::init::open_sqlcipher;
52use crate::key_material::KeyMaterial;
53use crate::migration;
54use crate::recovery::{
55 DriftReport, RebuildReport, ReplayReport, detect_drift, rebuild_hnsw_from_sql,
56 replay_pending_index,
57};
58use crate::snapshot;
59use crate::tenants::{
60 TENANTS_INDEX_FILENAME, TENANTS_SUBDIR, migrate_v071_to_v080,
61};
62use crate::vector_index::{HnswFactory, HnswIndex, HnswParams};
63
64pub struct StartupOutcome {
68 pub data_dir: PathBuf,
69 pub db_path: PathBuf,
70 pub config: SoloConfig,
71 pub schema_version: u32,
74 pub hnsw: Arc<dyn VectorIndex + Send + Sync>,
77 pub embedder_id: i64,
81 pub replay: ReplayReport,
83 pub drift: DriftReport,
87 pub used_bak_snapshot: bool,
89 pub started_fresh: bool,
94 pub rebuild: RebuildReport,
101}
102
103impl std::fmt::Debug for StartupOutcome {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("StartupOutcome")
107 .field("data_dir", &self.data_dir)
108 .field("schema_version", &self.schema_version)
109 .field("hnsw_len", &self.hnsw.len())
110 .field("hnsw_dim", &self.hnsw.dim())
111 .field("embedder_id", &self.embedder_id)
112 .field("replay", &self.replay)
113 .field("drift", &self.drift)
114 .field("used_bak_snapshot", &self.used_bak_snapshot)
115 .field("started_fresh", &self.started_fresh)
116 .field("rebuild", &self.rebuild)
117 .finish()
118 }
119}
120
121#[derive(Debug, Clone)]
124pub struct StartupParams {
125 pub data_dir: PathBuf,
126 pub key: KeyMaterial,
127 pub hnsw_params: HnswParams,
128}
129
130impl StartupParams {
131 pub fn new(data_dir: impl Into<PathBuf>, key: KeyMaterial) -> Self {
132 Self {
133 data_dir: data_dir.into(),
134 key,
135 hnsw_params: HnswParams::default(),
136 }
137 }
138
139 pub fn with_hnsw_params(mut self, params: HnswParams) -> Self {
140 self.hnsw_params = params;
141 self
142 }
143}
144
145pub fn run(params: StartupParams) -> Result<StartupOutcome> {
147 let StartupParams {
148 data_dir,
149 key,
150 hnsw_params,
151 } = params;
152
153 let config_path = data_dir.join("solo.config.toml");
155 let config = SoloConfig::read(&config_path)?;
156 let dim = config.embedder.dim as usize;
157 if dim == 0 {
158 return Err(solo_core::Error::storage(format!(
159 "solo.config.toml records embedder.dim=0 — corrupt config? at {config_path:?}"
160 )));
161 }
162
163 let tenants_index_path = data_dir.join(TENANTS_INDEX_FILENAME);
176 let tenants_default_db = data_dir.join(TENANTS_SUBDIR).join("default.db");
177 let legacy_db_path = data_dir.join("solo.db");
178 let db_path: PathBuf = if tenants_index_path.is_file() && tenants_default_db.is_file() {
179 tenants_default_db
180 } else if legacy_db_path.is_file() && !tenants_index_path.is_file() {
181 tracing::info!(
183 data_dir = %data_dir.display(),
184 "v0.7.1 single-DB layout detected; running v0.7.1 → v0.8.0 mass-data-move"
185 );
186 migrate_v071_to_v080(&data_dir, &key)?;
187 let migrated = data_dir.join(TENANTS_SUBDIR).join("default.db");
188 if !migrated.is_file() {
189 return Err(solo_core::Error::storage(format!(
190 "v0.7.1 → v0.8.0 migration completed without errors but \
191 the migrated default.db is not present at {}",
192 migrated.display()
193 )));
194 }
195 migrated
196 } else {
197 return Err(solo_core::Error::not_found(format!(
198 "Solo database not found in {}; run `solo init` first",
199 data_dir.display()
200 )));
201 };
202 let mut conn: Connection = open_sqlcipher(&db_path, &key)?;
203
204 let schema_version = migration::run_migrations(&mut conn)?;
206
207 let embedder_identity = EmbedderIdentity {
212 name: config.embedder.name.clone(),
213 version: config.embedder.version.clone(),
214 dim: config.embedder.dim,
215 dtype: config.embedder.dtype.clone(),
216 };
217 let embedder_id = get_or_insert_embedder_id(&conn, &embedder_identity)?;
218
219 let snapshot_dir = data_dir.join(TENANTS_SUBDIR);
227 let factory = HnswFactory::with_params(hnsw_params);
228 let (hnsw_index, used_bak_snapshot, started_fresh) =
229 load_hnsw_with_fallback(&snapshot_dir, &factory, dim);
230
231 if !started_fresh && hnsw_index.dim() != dim {
233 return Err(solo_core::Error::storage(format!(
234 "HNSW snapshot dim ({}) does not match solo.config.toml embedder.dim ({}). \
235 Embedder identity has shifted under the daemon. Run `solo reembed` to rebuild.",
236 hnsw_index.dim(),
237 dim
238 )));
239 }
240
241 let rebuild = if started_fresh {
252 let started = std::time::Instant::now();
253 let r = rebuild_hnsw_from_sql(&conn, &hnsw_index, embedder_id)?;
254 if r.rows_seen > 0 {
255 tracing::info!(
256 rows_seen = r.rows_seen,
257 rows_added = r.rows_added,
258 rows_skipped = r.rows_skipped,
259 elapsed_ms = started.elapsed().as_millis() as u64,
260 "rebuilt HNSW from `embeddings` after empty-snapshot fallback"
261 );
262 }
263 r
264 } else {
265 RebuildReport::default()
266 };
267
268 let hnsw: Arc<dyn VectorIndex + Send + Sync> = Arc::new(hnsw_index);
269
270 let (forgotten, forgotten_chunks) = if started_fresh {
289 (0, 0)
290 } else {
291 let eps = rebuild_episode_tombstones_from_sql(&conn, hnsw.as_ref())?;
295 let chunks = rebuild_chunk_tombstones_from_sql(&conn, hnsw.as_ref())?;
296 (eps, chunks)
297 };
298 if forgotten > 0 {
299 tracing::info!(forgotten, "rebuilt HNSW tombstones from episodes.status='forgotten'");
300 }
301 if forgotten_chunks > 0 {
302 tracing::info!(
303 forgotten_chunks,
304 "rebuilt HNSW tombstones from document_chunks of forgotten documents"
305 );
306 }
307
308 let replay = replay_pending_index(&mut conn, hnsw.as_ref())?;
310
311 let drift = detect_drift(&conn, hnsw.as_ref())?;
313
314 drop(conn);
316
317 Ok(StartupOutcome {
318 data_dir,
319 db_path,
320 config,
321 schema_version,
322 hnsw,
323 embedder_id,
324 replay,
325 drift,
326 used_bak_snapshot,
327 started_fresh,
328 rebuild,
329 })
330}
331
332fn load_hnsw_with_fallback(
342 data_dir: &Path,
343 factory: &HnswFactory,
344 dim: usize,
345) -> (HnswIndex, bool, bool) {
346 match snapshot::load(data_dir) {
347 Ok(idx) => {
348 tracing::info!(
349 snapshot_kind = "live",
350 dim = idx.dim(),
351 len = idx.len(),
352 "HNSW loaded from live snapshot"
353 );
354 (idx, false, false)
355 }
356 Err(primary_err) => {
357 tracing::warn!(error = %primary_err, "live HNSW snapshot failed; trying .bak");
358 match snapshot::load_bak(data_dir) {
359 Ok(idx) => {
360 tracing::warn!(
361 snapshot_kind = "bak",
362 dim = idx.dim(),
363 len = idx.len(),
364 "HNSW loaded from backup snapshot — investigate the live pair"
365 );
366 (idx, true, false)
367 }
368 Err(bak_err) => {
369 tracing::warn!(
370 primary = %primary_err,
371 bak = %bak_err,
372 dim,
373 "no HNSW snapshot available; starting fresh empty index. \
374 The startup chain will attempt rebuild_hnsw_from_sql next; \
375 if the `embeddings` table is also empty, recall will return \
376 no hits until new content is remembered."
377 );
378 let empty = factory
379 .create(dim)
380 .expect("HnswFactory::create with valid dim must succeed");
381 (empty, false, true)
382 }
383 }
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use crate::config::EmbedderConfig;
392 use crate::init::{InitParams, init};
393 use crate::key_material::KeyMaterial;
394 use rusqlite::params;
395 use solo_core::{Confidence, EncodingContext, Episode, MemoryId, Tier};
396
397 fn fresh_init_dir() -> (tempfile::TempDir, KeyMaterial) {
398 let tmp = tempfile::TempDir::new().unwrap();
399 let _ = init(InitParams {
403 data_dir: tmp.path().to_path_buf(),
404 passphrase: zeroize::Zeroizing::new("password-123".into()),
405 force: false,
406 embedder: EmbedderConfig {
407 name: "stub".into(),
408 version: "v1".into(),
409 dim: 32,
410 dtype: "f32".into(),
411 },
412 })
413 .unwrap();
414 let cfg = SoloConfig::read(&tmp.path().join("solo.config.toml")).unwrap();
416 let key = KeyMaterial::derive("password-123", &cfg.salt_bytes().unwrap()).unwrap();
417 (tmp, key)
418 }
419
420 fn snapshot_dir(data_dir: &Path) -> PathBuf {
424 data_dir.join(crate::tenants::TENANTS_SUBDIR)
425 }
426 fn per_tenant_db(data_dir: &Path) -> PathBuf {
427 data_dir
428 .join(crate::tenants::TENANTS_SUBDIR)
429 .join("default.db")
430 }
431
432 fn enqueue_pending(conn: &Connection, memory_id: &str, dim: usize) {
433 let zeros = vec![0u8; dim * 4];
434 conn.execute(
435 "INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
436 VALUES (?, ?, ?, ?)",
437 params![memory_id, &zeros[..], dim as i64, 0i64],
438 )
439 .unwrap();
440 }
441
442 fn insert_hot_episode(conn: &Connection, content: &str) -> String {
443 let mid = MemoryId::new();
444 let ep = Episode {
445 memory_id: mid,
446 ts_ms: chrono::Utc::now().timestamp_millis(),
447 source_type: "user_message".into(),
448 source_id: None,
449 content: content.into(),
450 encoding_context: EncodingContext::default(),
451 provenance: None,
452 confidence: Confidence::new(0.9).unwrap(),
453 strength: 0.5,
454 salience: 0.5,
455 tier: Tier::Hot,
456 };
457 let now_ms = chrono::Utc::now().timestamp_millis();
458 let tier = match ep.tier {
459 Tier::Hot => "hot",
460 Tier::Warm => "warm",
461 Tier::Cold => "cold",
462 };
463 conn.execute(
464 "INSERT INTO episodes (
465 memory_id, ts_ms, source_type, source_id, content,
466 encoding_context_json, provenance_json, confidence,
467 strength, salience, tier, created_at_ms, updated_at_ms
468 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
469 params![
470 ep.memory_id.to_string(),
471 ep.ts_ms,
472 ep.source_type,
473 ep.source_id,
474 ep.content,
475 "{}",
476 Option::<String>::None,
477 ep.confidence.0,
478 ep.strength,
479 ep.salience,
480 tier,
481 now_ms,
482 now_ms,
483 ],
484 )
485 .unwrap();
486 mid.to_string()
487 }
488
489 #[test]
490 fn run_starts_fresh_when_no_snapshot_exists() {
491 let (tmp, key) = fresh_init_dir();
492 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
493 assert!(outcome.started_fresh);
494 assert!(!outcome.used_bak_snapshot);
495 assert_eq!(outcome.hnsw.len(), 0);
496 assert_eq!(outcome.hnsw.dim(), 32);
497 assert_eq!(outcome.replay.rows_seen, 0);
498 assert!(outcome.drift.is_clean());
499 }
500
501 #[test]
502 fn run_replays_pending_index_into_fresh_hnsw() {
503 let (tmp, key) = fresh_init_dir();
504 let cfg = SoloConfig::read(&tmp.path().join("solo.config.toml")).unwrap();
506 let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
507 let mid = insert_hot_episode(&conn, "hello startup");
508 enqueue_pending(&conn, &mid, cfg.embedder.dim as usize);
509 drop(conn);
510
511 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
512 assert!(outcome.started_fresh);
513 assert_eq!(outcome.replay.rows_seen, 1);
514 assert_eq!(outcome.replay.rows_replayed, 1);
515 assert_eq!(outcome.hnsw.len(), 1);
516 assert!(outcome.drift.is_clean(), "drift: {:?}", outcome.drift);
517 }
518
519 #[test]
520 fn run_loads_persisted_snapshot_when_present() {
521 let (tmp, key) = fresh_init_dir();
522 let dim = 32usize;
523 {
525 use solo_core::VectorIndex;
526 let factory = HnswFactory::default();
527 let idx = factory.create(dim).unwrap();
528 for i in 1..=5 {
529 let v = vec![0.1f32 * i as f32; dim];
530 idx.add(i as i64, &v).unwrap();
531 }
532 snapshot::save(&idx, &snapshot_dir(tmp.path())).unwrap();
533 }
534
535 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
536 assert!(!outcome.started_fresh);
537 assert!(!outcome.used_bak_snapshot);
538 assert_eq!(outcome.hnsw.len(), 5);
539 assert_eq!(outcome.hnsw.dim(), dim);
540 }
541
542 #[test]
543 fn run_falls_back_to_bak_when_live_corrupt() {
544 let (tmp, key) = fresh_init_dir();
545 let dim = 32usize;
546 {
547 use solo_core::VectorIndex;
548 let factory = HnswFactory::default();
549 let idx1 = factory.create(dim).unwrap();
551 for i in 1..=3 {
552 idx1.add(i, &vec![0.0f32; dim]).unwrap();
553 }
554 snapshot::save(&idx1, &snapshot_dir(tmp.path())).unwrap();
555 let idx2 = factory.create(dim).unwrap();
557 for i in 1..=5 {
558 idx2.add(i, &vec![0.0f32; dim]).unwrap();
559 }
560 snapshot::save(&idx2, &snapshot_dir(tmp.path())).unwrap();
561 }
562 std::fs::write(
564 snapshot_dir(tmp.path()).join("hnsw_episodes.hnsw.graph"),
565 b"GARBAGE",
566 )
567 .unwrap();
568
569 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
570 assert!(!outcome.started_fresh);
571 assert!(outcome.used_bak_snapshot);
572 assert_eq!(outcome.hnsw.len(), 3); }
574
575 #[test]
576 fn run_refuses_when_db_missing() {
577 let tmp = tempfile::TempDir::new().unwrap();
578 let cfg = SoloConfig::new(
580 [0u8; crate::key_material::SALT_LEN],
581 EmbedderConfig {
582 name: "stub".into(),
583 version: "v1".into(),
584 dim: 32,
585 dtype: "f32".into(),
586 },
587 );
588 cfg.write(&tmp.path().join("solo.config.toml")).unwrap();
589 let key =
590 KeyMaterial::derive("password-123", &cfg.salt_bytes().unwrap()).unwrap();
591 let err = run(StartupParams::new(tmp.path(), key)).unwrap_err();
592 assert!(
593 err.to_string().contains("not found"),
594 "got: {err}"
595 );
596 }
597
598 #[test]
599 fn run_refuses_when_dim_mismatches_snapshot() {
600 let (tmp, key) = fresh_init_dir();
601 {
603 use solo_core::VectorIndex;
604 let factory = HnswFactory::default();
605 let idx = factory.create(8).unwrap();
606 idx.add(1, &vec![0.0f32; 8]).unwrap();
607 snapshot::save(&idx, &snapshot_dir(tmp.path())).unwrap();
608 }
609 let err = run(StartupParams::new(tmp.path(), key)).unwrap_err();
610 assert!(
611 err.to_string().contains("does not match"),
612 "got: {err}"
613 );
614 }
615
616 fn seed_embeddings_for_current_embedder(
621 tmp_path: &Path,
622 key: &KeyMaterial,
623 contents: &[&str],
624 ) -> Vec<(String, i64)> {
625 let cfg_path = tmp_path.join("solo.config.toml");
626 let cfg = SoloConfig::read(&cfg_path).unwrap();
627 let conn = open_sqlcipher(&per_tenant_db(tmp_path), key).unwrap();
628 let identity = EmbedderIdentity {
629 name: cfg.embedder.name.clone(),
630 version: cfg.embedder.version.clone(),
631 dim: cfg.embedder.dim,
632 dtype: cfg.embedder.dtype.clone(),
633 };
634 let embedder_id = get_or_insert_embedder_id(&conn, &identity).unwrap();
635 let dim = cfg.embedder.dim as usize;
636 let now_ms = chrono::Utc::now().timestamp_millis();
637
638 let mut out = Vec::new();
639 for content in contents {
640 let mid = insert_hot_episode(&conn, content);
641 let rowid: i64 = conn
642 .query_row(
643 "SELECT rowid FROM episodes WHERE memory_id = ?",
644 params![mid],
645 |r| r.get(0),
646 )
647 .unwrap();
648 let mut bytes = vec![0u8; dim * 4];
650 bytes[..8].copy_from_slice(&rowid.to_le_bytes());
652 conn.execute(
653 "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
654 VALUES (?, ?, ?, ?, ?, ?)",
655 params![mid, embedder_id, "f32", dim as i64, &bytes[..], now_ms],
656 )
657 .unwrap();
658 out.push((mid, rowid));
659 }
660 drop(conn);
661 out
662 }
663
664 #[test]
669 fn run_rebuilds_hnsw_from_sql_when_no_snapshot() {
670 let (tmp, key) = fresh_init_dir();
671 seed_embeddings_for_current_embedder(tmp.path(), &key, &["a", "b", "c"]);
672
673 assert!(!snapshot::pair_exists(&snapshot_dir(tmp.path()), snapshot::LIVE_BASENAME));
675
676 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
677
678 assert!(outcome.started_fresh, "no snapshot → started_fresh");
679 assert_eq!(outcome.rebuild.rows_seen, 3);
680 assert_eq!(outcome.rebuild.rows_added, 3, "all 3 active rows rebuilt");
681 assert_eq!(outcome.rebuild.rows_skipped, 0);
682 assert_eq!(outcome.hnsw.len(), 3);
683 assert!(outcome.drift.is_clean(), "drift: {:?}", outcome.drift);
684 }
685
686 #[test]
691 fn run_rebuild_excludes_forgotten_episodes() {
692 let (tmp, key) = fresh_init_dir();
693 let seeded =
694 seed_embeddings_for_current_embedder(tmp.path(), &key, &["keep1", "drop", "keep2"]);
695
696 let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
697 conn.execute(
698 "UPDATE episodes SET status = 'forgotten' WHERE memory_id = ?",
699 params![seeded[1].0],
700 )
701 .unwrap();
702 drop(conn);
703
704 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
705 assert_eq!(outcome.rebuild.rows_added, 2, "forgotten row skipped");
706 assert_eq!(outcome.rebuild.rows_skipped, 0);
707 assert_eq!(outcome.hnsw.len(), 2);
708 }
709
710 #[test]
716 fn run_rebuild_skips_corrupt_rows_and_continues() {
717 let (tmp, key) = fresh_init_dir();
718 let _seeded =
719 seed_embeddings_for_current_embedder(tmp.path(), &key, &["good1", "bad", "good2"]);
720
721 let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
724 conn.execute(
725 "UPDATE embeddings SET vector = ?, dim = ?
726 WHERE memory_id = ?",
727 params![&vec![0u8; 4][..], 32i64, _seeded[1].0],
728 )
729 .unwrap();
730 drop(conn);
731
732 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
733 assert_eq!(outcome.rebuild.rows_seen, 3);
734 assert_eq!(outcome.rebuild.rows_added, 2, "two healthy rows added");
735 assert_eq!(outcome.rebuild.rows_skipped, 1, "corrupt row skipped");
736 assert_eq!(outcome.hnsw.len(), 2);
737 }
738
739 #[test]
744 fn run_rebuild_skips_rows_for_non_current_embedder() {
745 let (tmp, key) = fresh_init_dir();
746 seed_embeddings_for_current_embedder(tmp.path(), &key, &["ours"]);
748
749 let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
752 let other_id = get_or_insert_embedder_id(
753 &conn,
754 &EmbedderIdentity {
755 name: "other".into(),
756 version: "v1".into(),
757 dim: 32,
758 dtype: "f32".into(),
759 },
760 )
761 .unwrap();
762 let stray_mid = insert_hot_episode(&conn, "stray");
763 let zeros = vec![0u8; 32 * 4];
764 let now = chrono::Utc::now().timestamp_millis();
765 conn.execute(
766 "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
767 VALUES (?, ?, ?, ?, ?, ?)",
768 params![stray_mid, other_id, "f32", 32i64, &zeros[..], now],
769 )
770 .unwrap();
771 drop(conn);
772
773 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
774 assert_eq!(
775 outcome.rebuild.rows_added, 1,
776 "only the row under the current embedder is rebuilt"
777 );
778 assert_eq!(outcome.rebuild.rows_skipped, 0);
779 assert_eq!(outcome.hnsw.len(), 1);
780 }
781
782 #[test]
788 fn run_rebuilds_tombstones_from_forgotten_episodes() {
789 use solo_core::VectorIndex;
790 let (tmp, key) = fresh_init_dir();
791 let dim = 32usize;
792
793 {
795 let factory = HnswFactory::default();
796 let idx = factory.create(dim).unwrap();
797 for i in 1..=3 {
798 idx.add(i as i64, &vec![0.1f32; dim]).unwrap();
799 }
800 snapshot::save(&idx, &snapshot_dir(tmp.path())).unwrap();
801 }
802
803 let conn = open_sqlcipher(&per_tenant_db(tmp.path()), &key).unwrap();
806 let _ = insert_hot_episode(&conn, "first");
807 let mid2 = insert_hot_episode(&conn, "second");
808 let _ = insert_hot_episode(&conn, "third");
809 conn.execute(
810 "UPDATE episodes SET status='forgotten' WHERE memory_id = ?",
811 params![mid2],
812 )
813 .unwrap();
814 drop(conn);
815
816 let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
817 assert_eq!(outcome.hnsw.len(), 2);
819 assert!(
821 outcome.drift.is_clean(),
822 "expected clean drift after tombstone rebuild, got: {:?}",
823 outcome.drift
824 );
825 let hits = outcome.hnsw.search(&vec![0.1f32; dim], 5).unwrap();
827 assert!(
828 !hits.iter().any(|(r, _)| *r == 2),
829 "rowid 2 should be tombstoned: hits={hits:?}"
830 );
831 }
832}