use sqlx::SqlitePool;
use super::helpers::StoreError;
#[allow(unused_imports)]
use super::helpers::CURRENT_SCHEMA_VERSION;
pub async fn migrate(pool: &SqlitePool, from: i32, to: i32) -> Result<(), StoreError> {
if from == to {
return Ok(()); }
if from > to {
return Err(StoreError::SchemaNewerThanCq(from));
}
tracing::info!(
from_version = from,
to_version = to,
"Starting schema migration"
);
let mut tx = pool.begin().await?;
let current_in_tx: Option<(String,)> =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_optional(&mut *tx)
.await?;
let actual_from: i32 = current_in_tx
.and_then(|(s,)| s.parse().ok())
.unwrap_or(from);
if actual_from >= to {
tracing::info!(
actual_from,
to,
"Schema already migrated by another process, skipping"
);
tx.rollback().await?;
return Ok(());
}
for version in actual_from..to {
tracing::info!(from = version, to = version + 1, "Running migration step");
run_migration(&mut tx, version, version + 1).await?;
}
sqlx::query("UPDATE metadata SET value = ?1 WHERE key = 'schema_version'")
.bind(to.to_string())
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::info!(new_version = to, "Schema migration complete");
Ok(())
}
#[allow(clippy::match_single_binding)] async fn run_migration(
conn: &mut sqlx::SqliteConnection,
from: i32,
to: i32,
) -> Result<(), StoreError> {
match (from, to) {
(10, 11) => migrate_v10_to_v11(conn).await,
(11, 12) => migrate_v11_to_v12(conn).await,
(12, 13) => migrate_v12_to_v13(conn).await,
(13, 14) => migrate_v13_to_v14(conn).await,
(14, 15) => migrate_v14_to_v15(conn).await,
(15, 16) => migrate_v15_to_v16(conn).await,
(16, 17) => migrate_v16_to_v17(conn).await,
(17, 18) => migrate_v17_to_v18(conn).await,
(18, 19) => migrate_v18_to_v19(conn).await,
(19, 20) => migrate_v19_to_v20(conn).await,
_ => Err(StoreError::MigrationNotSupported(from, to)),
}
}
async fn migrate_v10_to_v11(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS type_edges (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_chunk_id TEXT NOT NULL,
target_type_name TEXT NOT NULL,
edge_kind TEXT NOT NULL DEFAULT '',
line_number INTEGER NOT NULL,
FOREIGN KEY (source_chunk_id) REFERENCES chunks(id) ON DELETE CASCADE
)",
)
.execute(&mut *conn)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_type_edges_source ON type_edges(source_chunk_id)")
.execute(&mut *conn)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_type_edges_target ON type_edges(target_type_name)")
.execute(&mut *conn)
.await?;
tracing::info!("Created type_edges table. Run 'cqs index --force' to populate type edges.");
Ok(())
}
async fn migrate_v11_to_v12(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
sqlx::query("ALTER TABLE chunks ADD COLUMN parent_type_name TEXT")
.execute(&mut *conn)
.await?;
tracing::info!(
"Added parent_type_name column. Run 'cqs index --force' to populate method→class links."
);
Ok(())
}
async fn migrate_v12_to_v13(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
sqlx::query("ALTER TABLE chunks ADD COLUMN enrichment_hash TEXT")
.execute(&mut *conn)
.await?;
sqlx::query("INSERT OR IGNORE INTO metadata (key, value) VALUES ('hnsw_dirty', '0')")
.execute(&mut *conn)
.await?;
tracing::info!(
"Added enrichment_hash column and hnsw_dirty flag. \
Run 'cqs index --force' to populate enrichment hashes."
);
Ok(())
}
async fn migrate_v13_to_v14(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS llm_summaries (
content_hash TEXT PRIMARY KEY,
summary TEXT NOT NULL,
model TEXT NOT NULL,
created_at TEXT NOT NULL
)",
)
.execute(&mut *conn)
.await?;
tracing::info!("Created llm_summaries table for LLM-generated function summaries.");
Ok(())
}
async fn migrate_v14_to_v15(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
sqlx::query("UPDATE metadata SET value = '768' WHERE key = 'dimensions' AND value = '769'")
.execute(&mut *conn)
.await?;
sqlx::query("UPDATE metadata SET value = '1' WHERE key = 'hnsw_dirty'")
.execute(&mut *conn)
.await?;
tracing::info!(
"Updated dimensions and marked HNSW dirty. \
Run 'cqs index --force' to rebuild embeddings."
);
Ok(())
}
async fn migrate_v15_to_v16(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS llm_summaries_v2 (
content_hash TEXT NOT NULL,
purpose TEXT NOT NULL DEFAULT 'summary',
summary TEXT NOT NULL,
model TEXT NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (content_hash, purpose)
)",
)
.execute(&mut *conn)
.await?;
sqlx::query(
"INSERT OR IGNORE INTO llm_summaries_v2 (content_hash, purpose, summary, model, created_at) \
SELECT content_hash, 'summary', summary, model, created_at FROM llm_summaries",
)
.execute(&mut *conn)
.await?;
sqlx::query("DROP TABLE IF EXISTS llm_summaries")
.execute(&mut *conn)
.await?;
sqlx::query("ALTER TABLE llm_summaries_v2 RENAME TO llm_summaries")
.execute(&mut *conn)
.await?;
tracing::info!("Recreated llm_summaries with composite PK (content_hash, purpose).");
Ok(())
}
async fn migrate_v16_to_v17(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
let _span = tracing::info_span!("migrate_v16_to_v17").entered();
sqlx::query(
"CREATE TABLE IF NOT EXISTS sparse_vectors (
chunk_id TEXT NOT NULL,
token_id INTEGER NOT NULL,
weight REAL NOT NULL,
PRIMARY KEY (chunk_id, token_id)
)",
)
.execute(&mut *conn)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_sparse_token ON sparse_vectors(token_id)")
.execute(&mut *conn)
.await?;
sqlx::query("ALTER TABLE chunks ADD COLUMN enrichment_version INTEGER NOT NULL DEFAULT 0")
.execute(&mut *conn)
.await?;
tracing::info!("Migrated to v17: sparse_vectors table + enrichment_version column");
Ok(())
}
async fn migrate_v17_to_v18(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
let _span = tracing::info_span!("migrate_v17_to_v18").entered();
sqlx::query("ALTER TABLE chunks ADD COLUMN embedding_base BLOB")
.execute(&mut *conn)
.await?;
tracing::info!("Migrated to v18: embedding_base column (NULL until next index pass)");
Ok(())
}
async fn migrate_v18_to_v19(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
let _span = tracing::info_span!("migrate_v18_to_v19").entered();
let (before_rows,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sparse_vectors")
.fetch_one(&mut *conn)
.await?;
sqlx::query(
"CREATE TABLE sparse_vectors_v19 (
chunk_id TEXT NOT NULL,
token_id INTEGER NOT NULL,
weight REAL NOT NULL,
PRIMARY KEY (chunk_id, token_id),
FOREIGN KEY (chunk_id) REFERENCES chunks(id) ON DELETE CASCADE
)",
)
.execute(&mut *conn)
.await?;
sqlx::query(
"INSERT INTO sparse_vectors_v19 (chunk_id, token_id, weight)
SELECT s.chunk_id, s.token_id, s.weight
FROM sparse_vectors s
INNER JOIN chunks c ON c.id = s.chunk_id",
)
.execute(&mut *conn)
.await?;
let (after_rows,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sparse_vectors_v19")
.fetch_one(&mut *conn)
.await?;
let dropped = before_rows - after_rows;
if dropped > 0 {
tracing::warn!(
before = before_rows,
after = after_rows,
dropped_orphans = dropped,
"v18→v19 migration dropped orphan sparse_vectors rows (chunks no longer exist). \
These were leaks from pre-v19 delete paths."
);
} else {
tracing::info!(
rows = before_rows,
"v18→v19 sparse_vectors row count unchanged after FK filter"
);
}
sqlx::query("DROP INDEX IF EXISTS idx_sparse_token")
.execute(&mut *conn)
.await?;
sqlx::query("DROP TABLE sparse_vectors")
.execute(&mut *conn)
.await?;
sqlx::query("ALTER TABLE sparse_vectors_v19 RENAME TO sparse_vectors")
.execute(&mut *conn)
.await?;
sqlx::query("CREATE INDEX idx_sparse_token ON sparse_vectors(token_id)")
.execute(&mut *conn)
.await?;
sqlx::query(
"INSERT INTO metadata (key, value) VALUES ('splade_generation', '1')
ON CONFLICT(key) DO UPDATE SET
value = CAST((CAST(value AS INTEGER) + 1) AS TEXT)",
)
.execute(&mut *conn)
.await?;
tracing::info!(
"Migrated to v19: sparse_vectors has FK(chunk_id) → chunks(id) ON DELETE CASCADE"
);
Ok(())
}
async fn migrate_v19_to_v20(conn: &mut sqlx::SqliteConnection) -> Result<(), StoreError> {
let _span = tracing::info_span!("migrate_v19_to_v20").entered();
sqlx::query(
"CREATE TRIGGER IF NOT EXISTS bump_splade_on_chunks_delete \
AFTER DELETE ON chunks \
BEGIN \
INSERT INTO metadata (key, value) VALUES ('splade_generation', '1') \
ON CONFLICT(key) DO UPDATE SET \
value = CAST((CAST(value AS INTEGER) + 1) AS TEXT); \
END",
)
.execute(&mut *conn)
.await?;
sqlx::query(
"INSERT INTO metadata (key, value) VALUES ('splade_generation', '1')
ON CONFLICT(key) DO UPDATE SET
value = CAST((CAST(value AS INTEGER) + 1) AS TEXT)",
)
.execute(&mut *conn)
.await?;
tracing::info!(
"Migrated to v20: AFTER DELETE trigger on chunks bumps splade_generation (DS-W2/OB-22 fix)"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::sqlite::SqlitePoolOptions;
#[test]
fn test_migration_not_supported_error() {
let err = StoreError::MigrationNotSupported(5, 6);
let msg = err.to_string();
assert!(msg.contains("5"));
assert!(msg.contains("6"));
}
#[test]
fn test_current_schema_version_documented() {
assert_eq!(CURRENT_SCHEMA_VERSION, 20);
}
#[test]
fn test_migrate_noop_same_version() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
let result = migrate(&pool, 15, 15).await;
assert!(result.is_ok(), "same-version migration should be no-op");
});
}
#[test]
fn test_migrate_rejects_downgrade() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
let result = migrate(&pool, 15, 14).await;
assert!(result.is_err(), "downgrade should fail");
match result.unwrap_err() {
StoreError::SchemaNewerThanCq(v) => assert_eq!(v, 15),
other => panic!("Expected SchemaNewerThanCq, got: {:?}", other),
}
});
}
#[test]
fn test_migrate_v10_to_v11_creates_type_edges() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
language TEXT NOT NULL DEFAULT '',
chunk_type TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL,
signature TEXT NOT NULL DEFAULT '',
content TEXT NOT NULL,
doc TEXT,
line_start INTEGER NOT NULL DEFAULT 0,
line_end INTEGER NOT NULL DEFAULT 0,
parent_id TEXT
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '10')")
.execute(&pool)
.await
.unwrap();
let table_check: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='table' AND name='type_edges'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(table_check.is_none(), "type_edges should not exist yet");
migrate(&pool, 10, 11).await.unwrap();
let table_check: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='table' AND name='type_edges'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(
table_check.is_some(),
"type_edges should exist after migration"
);
let version: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(version.0, "11");
let idx_source: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_type_edges_source'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(idx_source.is_some(), "source index should exist");
let idx_target: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_type_edges_target'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(idx_target.is_some(), "target index should exist");
});
}
#[test]
fn test_migrate_v12_to_v13() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
doc TEXT,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
source_mtime INTEGER,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
parent_id TEXT,
window_idx INTEGER,
parent_type_name TEXT
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '12')")
.execute(&pool)
.await
.unwrap();
migrate(&pool, 12, 13).await.unwrap();
sqlx::query(
"INSERT INTO chunks (id, origin, source_type, language, chunk_type, name, \
signature, content, content_hash, line_start, line_end, embedding, \
created_at, updated_at, enrichment_hash) \
VALUES ('test', 'file:test.rs', 'file', 'rust', 'function', 'test_fn', \
'', 'fn test() {}', 'abc123', 0, 1, X'00', '2026-01-01', '2026-01-01', 'hash123')",
)
.execute(&pool)
.await
.unwrap();
let dirty: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'hnsw_dirty'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(dirty.0, "0");
let version: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(version.0, "13");
});
}
#[test]
fn test_migrate_v13_to_v14() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
doc TEXT,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
source_mtime INTEGER,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
parent_id TEXT,
window_idx INTEGER,
parent_type_name TEXT,
enrichment_hash TEXT
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '13')")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('hnsw_dirty', '0')")
.execute(&pool)
.await
.unwrap();
let table_check: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='table' AND name='llm_summaries'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(table_check.is_none(), "llm_summaries should not exist yet");
migrate(&pool, 13, 14).await.unwrap();
let table_check: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='table' AND name='llm_summaries'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(
table_check.is_some(),
"llm_summaries should exist after migration"
);
sqlx::query(
"INSERT INTO llm_summaries (content_hash, summary, model, created_at) \
VALUES ('abc123', 'Test summary', 'claude-4', '2026-01-01')",
)
.execute(&pool)
.await
.unwrap();
let version: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(version.0, "14");
});
}
#[test]
fn test_migrate_v14_to_v15() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
doc TEXT,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
source_mtime INTEGER,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
parent_id TEXT,
window_idx INTEGER,
parent_type_name TEXT,
enrichment_hash TEXT
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS llm_summaries (
content_hash TEXT PRIMARY KEY,
summary TEXT NOT NULL,
model TEXT NOT NULL,
created_at TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '14')")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('dimensions', '769')")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('hnsw_dirty', '0')")
.execute(&pool)
.await
.unwrap();
migrate(&pool, 14, 15).await.unwrap();
let dims: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'dimensions'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(dims.0, "768", "dimensions should be updated to 768");
let dirty: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'hnsw_dirty'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(dirty.0, "1", "hnsw_dirty should be set to 1");
let version: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(version.0, "15");
});
}
#[test]
fn test_migrate_v15_to_v16() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS llm_summaries (
content_hash TEXT PRIMARY KEY,
summary TEXT NOT NULL,
model TEXT NOT NULL,
created_at TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '15')")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO llm_summaries (content_hash, summary, model, created_at) \
VALUES ('hash_a', 'Summary A', 'claude-4', '2026-01-01')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO llm_summaries (content_hash, summary, model, created_at) \
VALUES ('hash_b', 'Summary B', 'claude-4', '2026-01-02')",
)
.execute(&pool)
.await
.unwrap();
migrate(&pool, 15, 16).await.unwrap();
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM llm_summaries WHERE purpose = 'summary'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
count.0, 2,
"both existing rows should have purpose='summary'"
);
sqlx::query(
"INSERT INTO llm_summaries (content_hash, purpose, summary, model, created_at) \
VALUES ('hash_a', 'doc-comment', 'Doc comment A', 'claude-4', '2026-01-03')",
)
.execute(&pool)
.await
.expect("inserting same content_hash with different purpose should succeed");
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM llm_summaries")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 3, "should have 3 rows after inserting doc-comment");
let version: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(version.0, "16");
});
}
#[test]
fn test_migrate_v12_to_v14_full_chain() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
doc TEXT,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
source_mtime INTEGER,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
parent_id TEXT,
window_idx INTEGER,
parent_type_name TEXT
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '12')")
.execute(&pool)
.await
.unwrap();
migrate(&pool, 12, 14).await.unwrap();
sqlx::query(
"INSERT INTO chunks (id, origin, source_type, language, chunk_type, name, \
signature, content, content_hash, line_start, line_end, embedding, \
created_at, updated_at, enrichment_hash) \
VALUES ('test', 'file:test.rs', 'file', 'rust', 'function', 'test_fn', \
'', 'fn test() {}', 'abc123', 0, 1, X'00', '2026-01-01', '2026-01-01', 'hash123')",
)
.execute(&pool)
.await
.unwrap();
let table_check: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='table' AND name='llm_summaries'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(
table_check.is_some(),
"llm_summaries should exist after full chain migration"
);
let version: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(version.0, "14");
});
}
#[test]
fn test_migrate_unsupported_version_range() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '8')")
.execute(&pool)
.await
.unwrap();
let result = migrate(&pool, 8, 11).await;
assert!(result.is_err(), "unsupported range should fail");
match result.unwrap_err() {
StoreError::MigrationNotSupported(from, to) => {
assert_eq!(from, 8);
assert_eq!(to, 9);
}
other => panic!("Expected MigrationNotSupported, got: {:?}", other),
}
});
}
#[test]
fn test_migrate_v17_to_v18_adds_embedding_base_column() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query("CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
enrichment_version INTEGER NOT NULL DEFAULT 0
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '17')")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO chunks (id, origin, source_type, language, chunk_type, name, \
signature, content, content_hash, line_start, line_end, embedding, \
created_at, updated_at) \
VALUES ('chunk-1', 'file:lib.rs', 'file', 'rust', 'function', 'foo', \
'fn foo()', 'fn foo() {}', 'hash1', 10, 20, X'deadbeef', \
'2026-04-10', '2026-04-10')",
)
.execute(&pool)
.await
.unwrap();
migrate(&pool, 17, 18).await.unwrap();
let (embedding_existing, embedding_base): (Vec<u8>, Option<Vec<u8>>) =
sqlx::query_as("SELECT embedding, embedding_base FROM chunks WHERE id = 'chunk-1'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
embedding_existing,
vec![0xde, 0xad, 0xbe, 0xef],
"existing embedding must survive migration untouched"
);
assert!(
embedding_base.is_none(),
"embedding_base must be NULL for pre-existing rows (base pass hasn't run yet)"
);
let version: (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(version.0, "18");
sqlx::query("UPDATE chunks SET embedding_base = X'cafef00d' WHERE id = 'chunk-1'")
.execute(&pool)
.await
.unwrap();
let (base_after,): (Option<Vec<u8>>,) =
sqlx::query_as("SELECT embedding_base FROM chunks WHERE id = 'chunk-1'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(base_after, Some(vec![0xca, 0xfe, 0xf0, 0x0d]));
});
}
#[test]
fn test_migrate_v17_to_v18_dispatcher_is_idempotent() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true),
)
.await
.unwrap();
sqlx::query("CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
enrichment_version INTEGER NOT NULL DEFAULT 0
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO metadata (key, value) VALUES ('schema_version', '17')")
.execute(&pool)
.await
.unwrap();
migrate(&pool, 17, 18).await.unwrap();
migrate(&pool, 18, 18).await.unwrap();
});
}
#[test]
fn test_migrate_v18_to_v19_adds_fk_cascade_and_purges_orphans() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true)
.foreign_keys(true),
)
.await
.unwrap();
sqlx::query("CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
embedding_base BLOB,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
enrichment_version INTEGER NOT NULL DEFAULT 0
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE sparse_vectors (
chunk_id TEXT NOT NULL,
token_id INTEGER NOT NULL,
weight REAL NOT NULL,
PRIMARY KEY (chunk_id, token_id)
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("CREATE INDEX idx_sparse_token ON sparse_vectors(token_id)")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO metadata (key, value) VALUES ('schema_version', '18'),
('splade_generation', '5')",
)
.execute(&pool)
.await
.unwrap();
for id in ["chunk-live", "chunk-also-live"] {
sqlx::query(
"INSERT INTO chunks (id, origin, source_type, language, chunk_type, name, \
signature, content, content_hash, line_start, line_end, embedding, \
created_at, updated_at) \
VALUES (?1, 'file:lib.rs', 'file', 'rust', 'function', ?1, \
'', '', 'hash', 1, 10, X'00', '2026-04-11', '2026-04-11')",
)
.bind(id)
.execute(&pool)
.await
.unwrap();
}
sqlx::query(
"INSERT INTO sparse_vectors (chunk_id, token_id, weight) VALUES
('chunk-live', 1, 0.5),
('chunk-live', 2, 0.3),
('chunk-also-live', 3, 0.8),
('chunk-orphan', 4, 0.9),
('chunk-orphan', 5, 0.1)",
)
.execute(&pool)
.await
.unwrap();
migrate(&pool, 18, 19).await.unwrap();
let (count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM sparse_vectors WHERE chunk_id = 'chunk-orphan'",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 0, "orphan rows should have been dropped");
let (count_live,): (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM sparse_vectors WHERE chunk_id = 'chunk-live'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count_live, 2, "chunk-live rows should survive");
let idx: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_sparse_token'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(idx.is_some(), "idx_sparse_token must be recreated");
let (gen_val,): (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'splade_generation'")
.fetch_one(&pool)
.await
.unwrap();
let gen_parsed: u64 = gen_val.parse().unwrap();
assert!(gen_parsed > 5, "splade_generation must be bumped past 5");
let (v,): (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(v, "19");
sqlx::query("DELETE FROM chunks WHERE id = 'chunk-also-live'")
.execute(&pool)
.await
.unwrap();
let (remaining,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM sparse_vectors WHERE chunk_id = 'chunk-also-live'",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
remaining, 0,
"CASCADE should have removed chunk-also-live's sparse rows"
);
});
}
#[test]
fn test_migrate_v19_to_v20_adds_trigger_that_bumps_on_chunks_delete() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
rt.block_on(async {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true)
.foreign_keys(true),
)
.await
.unwrap();
sqlx::query("CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE chunks (
id TEXT PRIMARY KEY,
origin TEXT NOT NULL,
source_type TEXT NOT NULL,
language TEXT NOT NULL,
chunk_type TEXT NOT NULL,
name TEXT NOT NULL,
signature TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
line_start INTEGER NOT NULL,
line_end INTEGER NOT NULL,
embedding BLOB NOT NULL,
embedding_base BLOB,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
enrichment_version INTEGER NOT NULL DEFAULT 0
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE sparse_vectors (
chunk_id TEXT NOT NULL,
token_id INTEGER NOT NULL,
weight REAL NOT NULL,
PRIMARY KEY (chunk_id, token_id),
FOREIGN KEY (chunk_id) REFERENCES chunks(id) ON DELETE CASCADE
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("CREATE INDEX idx_sparse_token ON sparse_vectors(token_id)")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO metadata (key, value) VALUES ('schema_version', '19'),
('splade_generation', '10')",
)
.execute(&pool)
.await
.unwrap();
for id in ["c1", "c2", "c3"] {
sqlx::query(
"INSERT INTO chunks (id, origin, source_type, language, chunk_type, name, \
signature, content, content_hash, line_start, line_end, embedding, \
created_at, updated_at) \
VALUES (?1, 'file:lib.rs', 'file', 'rust', 'function', ?1, \
'', '', 'h', 1, 10, X'00', '2026-04-12', '2026-04-12')",
)
.bind(id)
.execute(&pool)
.await
.unwrap();
}
migrate(&pool, 19, 20).await.unwrap();
let (gen_after_migration,): (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'splade_generation'")
.fetch_one(&pool)
.await
.unwrap();
let gen_after_migration: u64 = gen_after_migration.parse().unwrap();
assert!(
gen_after_migration > 10,
"migration should bump generation past starting value 10"
);
let trigger: Option<(String,)> = sqlx::query_as(
"SELECT name FROM sqlite_master WHERE type='trigger' \
AND name='bump_splade_on_chunks_delete'",
)
.fetch_optional(&pool)
.await
.unwrap();
assert!(trigger.is_some(), "v20 trigger must exist after migration");
let (v,): (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(v, "20");
let before_one_delete = gen_after_migration;
sqlx::query("DELETE FROM chunks WHERE id = 'c1'")
.execute(&pool)
.await
.unwrap();
let (gen_after_one_delete,): (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'splade_generation'")
.fetch_one(&pool)
.await
.unwrap();
let gen_after_one_delete: u64 = gen_after_one_delete.parse().unwrap();
assert_eq!(
gen_after_one_delete,
before_one_delete + 1,
"one chunk delete should bump generation by exactly one"
);
sqlx::query("DELETE FROM chunks WHERE id IN ('c2', 'c3')")
.execute(&pool)
.await
.unwrap();
let (gen_after_two_delete,): (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'splade_generation'")
.fetch_one(&pool)
.await
.unwrap();
let gen_after_two_delete: u64 = gen_after_two_delete.parse().unwrap();
assert_eq!(
gen_after_two_delete,
gen_after_one_delete + 2,
"two chunk deletes should bump generation by exactly two"
);
sqlx::query(
"INSERT INTO chunks (id, origin, source_type, language, chunk_type, name, \
signature, content, content_hash, line_start, line_end, embedding, \
created_at, updated_at) \
VALUES ('c4', 'file:lib.rs', 'file', 'rust', 'function', 'c4', \
'', '', 'h', 1, 10, X'00', '2026-04-12', '2026-04-12')",
)
.execute(&pool)
.await
.unwrap();
let (gen_after_insert,): (String,) =
sqlx::query_as("SELECT value FROM metadata WHERE key = 'splade_generation'")
.fetch_one(&pool)
.await
.unwrap();
let gen_after_insert: u64 = gen_after_insert.parse().unwrap();
assert_eq!(
gen_after_insert, gen_after_two_delete,
"INSERT should NOT bump the generation (no DELETE happened)"
);
});
}
}