use std::path::{Path, PathBuf};
use std::sync::Arc;
use rusqlite::Connection;
use solo_core::{Result, VectorIndex, VectorIndexFactory};
use crate::config::SoloConfig;
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::hnsw_id::episode_hnsw_id;
use crate::init::open_sqlcipher;
use crate::key_material::KeyMaterial;
use crate::migration;
use crate::recovery::{
DriftReport, RebuildReport, ReplayReport, detect_drift, rebuild_hnsw_from_sql,
replay_pending_index,
};
use crate::snapshot;
use crate::vector_index::{HnswFactory, HnswIndex, HnswParams};
pub struct StartupOutcome {
pub data_dir: PathBuf,
pub db_path: PathBuf,
pub config: SoloConfig,
pub schema_version: u32,
pub hnsw: Arc<dyn VectorIndex + Send + Sync>,
pub embedder_id: i64,
pub replay: ReplayReport,
pub drift: DriftReport,
pub used_bak_snapshot: bool,
pub started_fresh: bool,
pub rebuild: RebuildReport,
}
impl std::fmt::Debug for StartupOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StartupOutcome")
.field("data_dir", &self.data_dir)
.field("schema_version", &self.schema_version)
.field("hnsw_len", &self.hnsw.len())
.field("hnsw_dim", &self.hnsw.dim())
.field("embedder_id", &self.embedder_id)
.field("replay", &self.replay)
.field("drift", &self.drift)
.field("used_bak_snapshot", &self.used_bak_snapshot)
.field("started_fresh", &self.started_fresh)
.field("rebuild", &self.rebuild)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct StartupParams {
pub data_dir: PathBuf,
pub key: KeyMaterial,
pub hnsw_params: HnswParams,
}
impl StartupParams {
pub fn new(data_dir: impl Into<PathBuf>, key: KeyMaterial) -> Self {
Self {
data_dir: data_dir.into(),
key,
hnsw_params: HnswParams::default(),
}
}
pub fn with_hnsw_params(mut self, params: HnswParams) -> Self {
self.hnsw_params = params;
self
}
}
pub fn run(params: StartupParams) -> Result<StartupOutcome> {
let StartupParams {
data_dir,
key,
hnsw_params,
} = params;
let config_path = data_dir.join("solo.config.toml");
let config = SoloConfig::read(&config_path)?;
let dim = config.embedder.dim as usize;
if dim == 0 {
return Err(solo_core::Error::storage(format!(
"solo.config.toml records embedder.dim=0 — corrupt config? at {config_path:?}"
)));
}
let db_path = data_dir.join("solo.db");
if !db_path.is_file() {
return Err(solo_core::Error::not_found(format!(
"Solo database not found at {db_path:?}; run `solo init` first"
)));
}
let mut conn: Connection = open_sqlcipher(&db_path, &key)?;
let schema_version = migration::run_migrations(&mut conn)?;
let embedder_identity = EmbedderIdentity {
name: config.embedder.name.clone(),
version: config.embedder.version.clone(),
dim: config.embedder.dim,
dtype: config.embedder.dtype.clone(),
};
let embedder_id = get_or_insert_embedder_id(&conn, &embedder_identity)?;
let factory = HnswFactory::with_params(hnsw_params);
let (hnsw_index, used_bak_snapshot, started_fresh) =
load_hnsw_with_fallback(&data_dir, &factory, dim);
if !started_fresh && hnsw_index.dim() != dim {
return Err(solo_core::Error::storage(format!(
"HNSW snapshot dim ({}) does not match solo.config.toml embedder.dim ({}). \
Embedder identity has shifted under the daemon. Run `solo reembed` to rebuild.",
hnsw_index.dim(),
dim
)));
}
let rebuild = if started_fresh {
let started = std::time::Instant::now();
let r = rebuild_hnsw_from_sql(&conn, &hnsw_index, embedder_id)?;
if r.rows_seen > 0 {
tracing::info!(
rows_seen = r.rows_seen,
rows_added = r.rows_added,
rows_skipped = r.rows_skipped,
elapsed_ms = started.elapsed().as_millis() as u64,
"rebuilt HNSW from `embeddings` after empty-snapshot fallback"
);
}
r
} else {
RebuildReport::default()
};
let hnsw: Arc<dyn VectorIndex + Send + Sync> = Arc::new(hnsw_index);
let forgotten = if started_fresh {
0
} else {
rebuild_tombstones_from_sql(&conn, hnsw.as_ref())?
};
if forgotten > 0 {
tracing::info!(forgotten, "rebuilt HNSW tombstones from episodes.status='forgotten'");
}
let replay = replay_pending_index(&mut conn, hnsw.as_ref())?;
let drift = detect_drift(&conn, hnsw.as_ref())?;
drop(conn);
Ok(StartupOutcome {
data_dir,
db_path,
config,
schema_version,
hnsw,
embedder_id,
replay,
drift,
used_bak_snapshot,
started_fresh,
rebuild,
})
}
fn rebuild_tombstones_from_sql(
conn: &Connection,
hnsw: &dyn VectorIndex,
) -> Result<usize> {
let mut stmt = conn
.prepare("SELECT rowid FROM episodes WHERE status = 'forgotten'")
.map_err(|e| solo_core::Error::storage(format!("prepare forgotten select: {e}")))?;
let rows = stmt
.query_map([], |row| row.get::<_, i64>(0))
.map_err(|e| solo_core::Error::storage(format!("query_map forgotten: {e}")))?;
let mut count = 0usize;
for r in rows {
let rowid = r.map_err(|e| solo_core::Error::storage(format!("forgotten row decode: {e}")))?;
hnsw.remove(episode_hnsw_id(rowid))?;
count += 1;
}
Ok(count)
}
fn load_hnsw_with_fallback(
data_dir: &Path,
factory: &HnswFactory,
dim: usize,
) -> (HnswIndex, bool, bool) {
match snapshot::load(data_dir) {
Ok(idx) => {
tracing::info!(
snapshot_kind = "live",
dim = idx.dim(),
len = idx.len(),
"HNSW loaded from live snapshot"
);
(idx, false, false)
}
Err(primary_err) => {
tracing::warn!(error = %primary_err, "live HNSW snapshot failed; trying .bak");
match snapshot::load_bak(data_dir) {
Ok(idx) => {
tracing::warn!(
snapshot_kind = "bak",
dim = idx.dim(),
len = idx.len(),
"HNSW loaded from backup snapshot — investigate the live pair"
);
(idx, true, false)
}
Err(bak_err) => {
tracing::warn!(
primary = %primary_err,
bak = %bak_err,
dim,
"no HNSW snapshot available; starting fresh empty index. \
The startup chain will attempt rebuild_hnsw_from_sql next; \
if the `embeddings` table is also empty, recall will return \
no hits until new content is remembered."
);
let empty = factory
.create(dim)
.expect("HnswFactory::create with valid dim must succeed");
(empty, false, true)
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::EmbedderConfig;
use crate::init::{InitParams, init};
use crate::key_material::KeyMaterial;
use rusqlite::params;
use solo_core::{Confidence, EncodingContext, Episode, MemoryId, Tier};
fn fresh_init_dir() -> (tempfile::TempDir, KeyMaterial) {
let tmp = tempfile::TempDir::new().unwrap();
let _ = init(InitParams {
data_dir: tmp.path().to_path_buf(),
passphrase: zeroize::Zeroizing::new("password-123".into()),
force: false,
embedder: EmbedderConfig {
name: "stub".into(),
version: "v1".into(),
dim: 32,
dtype: "f32".into(),
},
})
.unwrap();
let cfg = SoloConfig::read(&tmp.path().join("solo.config.toml")).unwrap();
let key = KeyMaterial::derive("password-123", &cfg.salt_bytes().unwrap()).unwrap();
(tmp, key)
}
fn enqueue_pending(conn: &Connection, memory_id: &str, dim: usize) {
let zeros = vec![0u8; dim * 4];
conn.execute(
"INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
VALUES (?, ?, ?, ?)",
params![memory_id, &zeros[..], dim as i64, 0i64],
)
.unwrap();
}
fn insert_hot_episode(conn: &Connection, content: &str) -> String {
let mid = MemoryId::new();
let ep = Episode {
memory_id: mid,
ts_ms: chrono::Utc::now().timestamp_millis(),
source_type: "user_message".into(),
source_id: None,
content: content.into(),
encoding_context: EncodingContext::default(),
provenance: None,
confidence: Confidence::new(0.9).unwrap(),
strength: 0.5,
salience: 0.5,
tier: Tier::Hot,
};
let now_ms = chrono::Utc::now().timestamp_millis();
let tier = match ep.tier {
Tier::Hot => "hot",
Tier::Warm => "warm",
Tier::Cold => "cold",
};
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, source_id, content,
encoding_context_json, provenance_json, confidence,
strength, salience, tier, created_at_ms, updated_at_ms
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
ep.memory_id.to_string(),
ep.ts_ms,
ep.source_type,
ep.source_id,
ep.content,
"{}",
Option::<String>::None,
ep.confidence.0,
ep.strength,
ep.salience,
tier,
now_ms,
now_ms,
],
)
.unwrap();
mid.to_string()
}
#[test]
fn run_starts_fresh_when_no_snapshot_exists() {
let (tmp, key) = fresh_init_dir();
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert!(outcome.started_fresh);
assert!(!outcome.used_bak_snapshot);
assert_eq!(outcome.hnsw.len(), 0);
assert_eq!(outcome.hnsw.dim(), 32);
assert_eq!(outcome.replay.rows_seen, 0);
assert!(outcome.drift.is_clean());
}
#[test]
fn run_replays_pending_index_into_fresh_hnsw() {
let (tmp, key) = fresh_init_dir();
let cfg = SoloConfig::read(&tmp.path().join("solo.config.toml")).unwrap();
let conn = open_sqlcipher(&tmp.path().join("solo.db"), &key).unwrap();
let mid = insert_hot_episode(&conn, "hello startup");
enqueue_pending(&conn, &mid, cfg.embedder.dim as usize);
drop(conn);
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert!(outcome.started_fresh);
assert_eq!(outcome.replay.rows_seen, 1);
assert_eq!(outcome.replay.rows_replayed, 1);
assert_eq!(outcome.hnsw.len(), 1);
assert!(outcome.drift.is_clean(), "drift: {:?}", outcome.drift);
}
#[test]
fn run_loads_persisted_snapshot_when_present() {
let (tmp, key) = fresh_init_dir();
let dim = 32usize;
{
use solo_core::VectorIndex;
let factory = HnswFactory::default();
let idx = factory.create(dim).unwrap();
for i in 1..=5 {
let v = vec![0.1f32 * i as f32; dim];
idx.add(i as i64, &v).unwrap();
}
snapshot::save(&idx, tmp.path()).unwrap();
}
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert!(!outcome.started_fresh);
assert!(!outcome.used_bak_snapshot);
assert_eq!(outcome.hnsw.len(), 5);
assert_eq!(outcome.hnsw.dim(), dim);
}
#[test]
fn run_falls_back_to_bak_when_live_corrupt() {
let (tmp, key) = fresh_init_dir();
let dim = 32usize;
{
use solo_core::VectorIndex;
let factory = HnswFactory::default();
let idx1 = factory.create(dim).unwrap();
for i in 1..=3 {
idx1.add(i, &vec![0.0f32; dim]).unwrap();
}
snapshot::save(&idx1, tmp.path()).unwrap();
let idx2 = factory.create(dim).unwrap();
for i in 1..=5 {
idx2.add(i, &vec![0.0f32; dim]).unwrap();
}
snapshot::save(&idx2, tmp.path()).unwrap();
}
std::fs::write(
tmp.path().join("hnsw_episodes.hnsw.graph"),
b"GARBAGE",
)
.unwrap();
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert!(!outcome.started_fresh);
assert!(outcome.used_bak_snapshot);
assert_eq!(outcome.hnsw.len(), 3); }
#[test]
fn run_refuses_when_db_missing() {
let tmp = tempfile::TempDir::new().unwrap();
let cfg = SoloConfig::new(
[0u8; crate::key_material::SALT_LEN],
EmbedderConfig {
name: "stub".into(),
version: "v1".into(),
dim: 32,
dtype: "f32".into(),
},
);
cfg.write(&tmp.path().join("solo.config.toml")).unwrap();
let key =
KeyMaterial::derive("password-123", &cfg.salt_bytes().unwrap()).unwrap();
let err = run(StartupParams::new(tmp.path(), key)).unwrap_err();
assert!(
err.to_string().contains("not found"),
"got: {err}"
);
}
#[test]
fn run_refuses_when_dim_mismatches_snapshot() {
let (tmp, key) = fresh_init_dir();
{
use solo_core::VectorIndex;
let factory = HnswFactory::default();
let idx = factory.create(8).unwrap();
idx.add(1, &vec![0.0f32; 8]).unwrap();
snapshot::save(&idx, tmp.path()).unwrap();
}
let err = run(StartupParams::new(tmp.path(), key)).unwrap_err();
assert!(
err.to_string().contains("does not match"),
"got: {err}"
);
}
fn seed_embeddings_for_current_embedder(
tmp_path: &Path,
key: &KeyMaterial,
contents: &[&str],
) -> Vec<(String, i64)> {
let cfg_path = tmp_path.join("solo.config.toml");
let cfg = SoloConfig::read(&cfg_path).unwrap();
let conn = open_sqlcipher(&tmp_path.join("solo.db"), key).unwrap();
let identity = EmbedderIdentity {
name: cfg.embedder.name.clone(),
version: cfg.embedder.version.clone(),
dim: cfg.embedder.dim,
dtype: cfg.embedder.dtype.clone(),
};
let embedder_id = get_or_insert_embedder_id(&conn, &identity).unwrap();
let dim = cfg.embedder.dim as usize;
let now_ms = chrono::Utc::now().timestamp_millis();
let mut out = Vec::new();
for content in contents {
let mid = insert_hot_episode(&conn, content);
let rowid: i64 = conn
.query_row(
"SELECT rowid FROM episodes WHERE memory_id = ?",
params![mid],
|r| r.get(0),
)
.unwrap();
let mut bytes = vec![0u8; dim * 4];
bytes[..8].copy_from_slice(&rowid.to_le_bytes());
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
VALUES (?, ?, ?, ?, ?, ?)",
params![mid, embedder_id, "f32", dim as i64, &bytes[..], now_ms],
)
.unwrap();
out.push((mid, rowid));
}
drop(conn);
out
}
#[test]
fn run_rebuilds_hnsw_from_sql_when_no_snapshot() {
let (tmp, key) = fresh_init_dir();
seed_embeddings_for_current_embedder(tmp.path(), &key, &["a", "b", "c"]);
assert!(!snapshot::pair_exists(tmp.path(), snapshot::LIVE_BASENAME));
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert!(outcome.started_fresh, "no snapshot → started_fresh");
assert_eq!(outcome.rebuild.rows_seen, 3);
assert_eq!(outcome.rebuild.rows_added, 3, "all 3 active rows rebuilt");
assert_eq!(outcome.rebuild.rows_skipped, 0);
assert_eq!(outcome.hnsw.len(), 3);
assert!(outcome.drift.is_clean(), "drift: {:?}", outcome.drift);
}
#[test]
fn run_rebuild_excludes_forgotten_episodes() {
let (tmp, key) = fresh_init_dir();
let seeded =
seed_embeddings_for_current_embedder(tmp.path(), &key, &["keep1", "drop", "keep2"]);
let conn = open_sqlcipher(&tmp.path().join("solo.db"), &key).unwrap();
conn.execute(
"UPDATE episodes SET status = 'forgotten' WHERE memory_id = ?",
params![seeded[1].0],
)
.unwrap();
drop(conn);
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert_eq!(outcome.rebuild.rows_added, 2, "forgotten row skipped");
assert_eq!(outcome.rebuild.rows_skipped, 0);
assert_eq!(outcome.hnsw.len(), 2);
}
#[test]
fn run_rebuild_skips_corrupt_rows_and_continues() {
let (tmp, key) = fresh_init_dir();
let _seeded =
seed_embeddings_for_current_embedder(tmp.path(), &key, &["good1", "bad", "good2"]);
let conn = open_sqlcipher(&tmp.path().join("solo.db"), &key).unwrap();
conn.execute(
"UPDATE embeddings SET vector = ?, dim = ?
WHERE memory_id = ?",
params![&vec![0u8; 4][..], 32i64, _seeded[1].0],
)
.unwrap();
drop(conn);
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert_eq!(outcome.rebuild.rows_seen, 3);
assert_eq!(outcome.rebuild.rows_added, 2, "two healthy rows added");
assert_eq!(outcome.rebuild.rows_skipped, 1, "corrupt row skipped");
assert_eq!(outcome.hnsw.len(), 2);
}
#[test]
fn run_rebuild_skips_rows_for_non_current_embedder() {
let (tmp, key) = fresh_init_dir();
seed_embeddings_for_current_embedder(tmp.path(), &key, &["ours"]);
let conn = open_sqlcipher(&tmp.path().join("solo.db"), &key).unwrap();
let other_id = get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "other".into(),
version: "v1".into(),
dim: 32,
dtype: "f32".into(),
},
)
.unwrap();
let stray_mid = insert_hot_episode(&conn, "stray");
let zeros = vec![0u8; 32 * 4];
let now = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
VALUES (?, ?, ?, ?, ?, ?)",
params![stray_mid, other_id, "f32", 32i64, &zeros[..], now],
)
.unwrap();
drop(conn);
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert_eq!(
outcome.rebuild.rows_added, 1,
"only the row under the current embedder is rebuilt"
);
assert_eq!(outcome.rebuild.rows_skipped, 0);
assert_eq!(outcome.hnsw.len(), 1);
}
#[test]
fn run_rebuilds_tombstones_from_forgotten_episodes() {
use solo_core::VectorIndex;
let (tmp, key) = fresh_init_dir();
let dim = 32usize;
{
let factory = HnswFactory::default();
let idx = factory.create(dim).unwrap();
for i in 1..=3 {
idx.add(i as i64, &vec![0.1f32; dim]).unwrap();
}
snapshot::save(&idx, tmp.path()).unwrap();
}
let conn = open_sqlcipher(&tmp.path().join("solo.db"), &key).unwrap();
let _ = insert_hot_episode(&conn, "first");
let mid2 = insert_hot_episode(&conn, "second");
let _ = insert_hot_episode(&conn, "third");
conn.execute(
"UPDATE episodes SET status='forgotten' WHERE memory_id = ?",
params![mid2],
)
.unwrap();
drop(conn);
let outcome = run(StartupParams::new(tmp.path(), key)).unwrap();
assert_eq!(outcome.hnsw.len(), 2);
assert!(
outcome.drift.is_clean(),
"expected clean drift after tombstone rebuild, got: {:?}",
outcome.drift
);
let hits = outcome.hnsw.search(&vec![0.1f32; dim], 5).unwrap();
assert!(
!hits.iter().any(|(r, _)| *r == 2),
"rowid 2 should be tombstoned: hits={hits:?}"
);
}
}