1use crate::errors::AppError;
16use crate::output;
17use crate::paths::AppPaths;
18use crate::storage::connection::{open_ro, open_rw};
19use serde::Serialize;
20
21const MEMORY_VEC_TABLES: &[&str] = &["memory_embeddings", "vec_memories"];
22
23#[derive(clap::Args)]
25#[command(
26 about = "Vector index maintenance (orphan detection, purge, stats)",
27 after_long_help = "EXAMPLES:\n \
28 # List orphan memory embedding rows whose memory_id is gone\n \
29 sqlite-graphrag vec orphan-list\n\n \
30 # Dry-run the purge (does not delete)\n \
31 sqlite-graphrag vec purge-orphan --dry-run\n\n \
32 # Actually purge orphans\n \
33 sqlite-graphrag vec purge-orphan --yes\n\n \
34 # Show stats for all vec0 tables\n \
35 sqlite-graphrag vec stats --json"
36)]
37pub struct VecArgs {
38 #[command(subcommand)]
39 pub command: VecSubcommand,
40}
41
42#[derive(clap::Subcommand)]
44pub enum VecSubcommand {
45 OrphanList(VecOrphanListArgs),
47 PurgeOrphan(VecPurgeOrphanArgs),
49 Stats(VecStatsArgs),
51}
52
53#[derive(clap::Args)]
55pub struct VecOrphanListArgs {
56 #[arg(long, hide = true)]
58 pub json: bool,
59 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61 pub db: Option<String>,
62}
63
64#[derive(clap::Args)]
66pub struct VecOrphanListInner {
67 pub json: bool,
68 pub db: Option<String>,
69}
70
71#[derive(clap::Args)]
73pub struct VecPurgeOrphanArgs {
74 #[arg(long, hide = true)]
76 pub json: bool,
77 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
79 pub db: Option<String>,
80 #[arg(long, default_value_t = false)]
82 pub yes: bool,
83 #[arg(long, default_value_t = false)]
85 pub dry_run: bool,
86}
87
88#[derive(clap::Args)]
90pub struct VecStatsArgs {
91 #[arg(long, hide = true)]
93 pub json: bool,
94 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96 pub db: Option<String>,
97}
98
99#[derive(Serialize)]
100struct VecOrphanListItem {
101 memory_id: i64,
103 vector_hash: String,
105 created_at: i64,
107}
108
109#[derive(Serialize)]
110struct VecOrphanListResponse {
111 action: String,
112 count: i64,
113 items: Vec<VecOrphanListItem>,
114 elapsed_ms: u64,
115}
116
117#[derive(Serialize)]
118struct VecPurgeOrphanResponse {
119 action: String,
120 deleted: i64,
121 deleted_entities: i64,
123 deleted_chunks: i64,
125 dry_run: bool,
126 elapsed_ms: u64,
127}
128
129#[derive(Serialize)]
130struct VecStatsResponse {
131 total_rows: i64,
132 orphaned: i64,
133 coverage_percent: f64,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 vec_entities_rows: Option<i64>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 vec_chunks_rows: Option<i64>,
138 fts_memories_rows: i64,
139 elapsed_ms: u64,
140}
141
142pub fn run(args: VecArgs) -> Result<(), AppError> {
147 match args.command {
148 VecSubcommand::OrphanList(a) => run_orphan_list(a),
149 VecSubcommand::PurgeOrphan(a) => run_purge_orphan(a),
150 VecSubcommand::Stats(a) => run_stats(a),
151 }
152}
153
154fn live_memory_embedding_stats(conn: &rusqlite::Connection) -> (i64, i64) {
155 if let Some(table_name) = first_existing_vec_table(conn, MEMORY_VEC_TABLES) {
156 let total = conn
157 .query_row(&format!("SELECT COUNT(*) FROM {table_name}"), [], |r| {
158 r.get(0)
159 })
160 .unwrap_or(0);
161 let orphaned = conn
162 .query_row(
163 &format!(
164 "SELECT COUNT(*)
165 FROM {table_name} v
166 LEFT JOIN memories m ON m.id = v.memory_id
167 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL"
168 ),
169 [],
170 |r| r.get(0),
171 )
172 .unwrap_or(0);
173 return (total, orphaned);
174 }
175
176 (0, 0)
177}
178
179fn first_existing_vec_table<'a>(
180 conn: &rusqlite::Connection,
181 candidates: &'a [&'a str],
182) -> Option<&'a str> {
183 candidates
184 .iter()
185 .copied()
186 .find(|table_name| vec_table_exists(conn, table_name))
187}
188
189fn count_rows_first_existing(conn: &rusqlite::Connection, candidates: &[&str]) -> Option<i64> {
190 for table in candidates {
191 if vec_table_exists(conn, table) {
192 return conn
193 .query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |r| r.get(0))
194 .ok();
195 }
196 }
197 None
198}
199
200fn run_orphan_list(args: VecOrphanListArgs) -> Result<(), AppError> {
201 let start = std::time::Instant::now();
202 let paths = AppPaths::resolve(args.db.as_deref())?;
203 crate::storage::connection::ensure_db_ready(&paths)?;
204 let conn = open_ro(&paths.db)?;
205
206 let Some(memory_table) = first_existing_vec_table(&conn, MEMORY_VEC_TABLES) else {
207 return output::emit_json(&VecOrphanListResponse {
208 action: "orphan_list".to_string(),
209 count: 0,
210 items: Vec::new(),
211 elapsed_ms: start.elapsed().as_millis() as u64,
212 });
213 };
214
215 let mut stmt = conn.prepare(&format!(
220 "SELECT v.memory_id, v.embedding, CAST(v.created_at AS INTEGER)
221 FROM {memory_table} v
222 LEFT JOIN memories m ON m.id = v.memory_id
223 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL
224 ORDER BY v.memory_id"
225 ))?;
226 let rows: Vec<VecOrphanListItem> = stmt
227 .query_map([], |r| {
228 let memory_id: i64 = r.get(0)?;
229 let blob: Vec<u8> = r.get(1)?;
230 let created_at: i64 = r.get(2)?;
231 let vector_hash = blake3::hash(&blob).to_hex().to_string();
232 Ok(VecOrphanListItem {
233 memory_id,
234 vector_hash,
235 created_at,
236 })
237 })?
238 .collect::<Result<Vec<_>, _>>()?;
239 let count = rows.len() as i64;
240
241 output::emit_json(&VecOrphanListResponse {
242 action: "orphan_list".to_string(),
243 count,
244 items: rows,
245 elapsed_ms: start.elapsed().as_millis() as u64,
246 })?;
247 Ok(())
248}
249
250fn run_purge_orphan(args: VecPurgeOrphanArgs) -> Result<(), AppError> {
251 let start = std::time::Instant::now();
252 let paths = AppPaths::resolve(args.db.as_deref())?;
253 crate::storage::connection::ensure_db_ready(&paths)?;
254 let conn = open_rw(&paths.db)?;
255
256 let Some(memory_table) = first_existing_vec_table(&conn, MEMORY_VEC_TABLES) else {
257 return output::emit_json(&VecPurgeOrphanResponse {
258 action: "purge_orphan".to_string(),
259 deleted: 0,
260 deleted_entities: 0,
261 deleted_chunks: 0,
262 dry_run: args.dry_run,
263 elapsed_ms: start.elapsed().as_millis() as u64,
264 });
265 };
266
267 let orphan_count: i64 = conn
268 .query_row(
269 &format!(
270 "SELECT COUNT(*) FROM {memory_table} v
271 LEFT JOIN memories m ON m.id = v.memory_id
272 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL"
273 ),
274 [],
275 |r| r.get(0),
276 )
277 .unwrap_or(0);
278
279 let orphan_entities_count: i64 = if vec_table_exists(&conn, "vec_entities") {
283 conn.query_row(
284 "SELECT COUNT(*) FROM vec_entities v
285 LEFT JOIN memories m ON m.id = v.memory_id
286 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL",
287 [],
288 |r| r.get(0),
289 )
290 .unwrap_or(0)
291 } else {
292 0
293 };
294 let orphan_chunks_count: i64 = if vec_table_exists(&conn, "vec_chunks") {
295 conn.query_row(
296 "SELECT COUNT(*) FROM vec_chunks v
297 LEFT JOIN memories m ON m.id = v.memory_id
298 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL",
299 [],
300 |r| r.get(0),
301 )
302 .unwrap_or(0)
303 } else {
304 0
305 };
306
307 if args.dry_run {
308 tracing::info!(target: "vec", orphan_count, orphan_entities_count, orphan_chunks_count, "dry-run: would delete orphans");
309 return output::emit_json(&VecPurgeOrphanResponse {
310 action: "purge_orphan_dry_run".to_string(),
311 deleted: 0,
312 deleted_entities: 0,
313 deleted_chunks: 0,
314 dry_run: true,
315 elapsed_ms: start.elapsed().as_millis() as u64,
316 });
317 }
318
319 if !args.yes {
320 return Err(AppError::Validation(format!(
321 "refusing to delete {orphan_count} memory embedding + {orphan_entities_count} vec_entities + {orphan_chunks_count} vec_chunks orphan rows without --yes (use --dry-run to preview)"
322 )));
323 }
324
325 let deleted: i64 = conn.execute(
326 &format!(
327 "DELETE FROM {memory_table}
328 WHERE NOT EXISTS (
329 SELECT 1 FROM memories m
330 WHERE m.id = {memory_table}.memory_id
331 AND m.deleted_at IS NULL
332 )"
333 ),
334 [],
335 )? as i64;
336
337 let deleted_entities: i64 = if vec_table_exists(&conn, "vec_entities") {
338 conn.execute(
339 "DELETE FROM vec_entities
340 WHERE NOT EXISTS (
341 SELECT 1 FROM memories m
342 WHERE m.id = vec_entities.memory_id
343 AND m.deleted_at IS NULL
344 )",
345 [],
346 )
347 .unwrap_or(0) as i64
348 } else {
349 0
350 };
351 let deleted_chunks: i64 = if vec_table_exists(&conn, "vec_chunks") {
352 conn.execute(
353 "DELETE FROM vec_chunks
354 WHERE NOT EXISTS (
355 SELECT 1 FROM memories m
356 WHERE m.id = vec_chunks.memory_id
357 AND m.deleted_at IS NULL
358 )",
359 [],
360 )
361 .unwrap_or(0) as i64
362 } else {
363 0
364 };
365
366 tracing::info!(target: "vec", deleted, deleted_entities, deleted_chunks, "purged orphan vec rows");
367
368 output::emit_json(&VecPurgeOrphanResponse {
369 action: "purged_orphan".to_string(),
370 deleted,
371 deleted_entities,
372 deleted_chunks,
373 dry_run: false,
374 elapsed_ms: start.elapsed().as_millis() as u64,
375 })?;
376 Ok(())
377}
378
379fn run_stats(args: VecStatsArgs) -> Result<(), AppError> {
380 let start = std::time::Instant::now();
381 let paths = AppPaths::resolve(args.db.as_deref())?;
382 crate::storage::connection::ensure_db_ready(&paths)?;
383 let conn = open_ro(&paths.db)?;
384
385 let (total_rows, orphaned) = live_memory_embedding_stats(&conn);
386 let coverage_percent = if total_rows > 0 {
387 ((total_rows - orphaned) as f64 / total_rows as f64) * 100.0
388 } else {
389 100.0
390 };
391
392 let vec_entities_rows =
393 count_rows_first_existing(&conn, &["entity_embeddings", "vec_entities"]);
394 let vec_chunks_rows = count_rows_first_existing(&conn, &["chunk_embeddings", "vec_chunks"]);
395 let fts_memories_rows = conn
396 .query_row("SELECT COUNT(*) FROM fts_memories", [], |r| r.get(0))
397 .unwrap_or(0);
398
399 output::emit_json(&VecStatsResponse {
400 total_rows,
401 orphaned,
402 coverage_percent,
403 vec_entities_rows,
404 vec_chunks_rows,
405 fts_memories_rows,
406 elapsed_ms: start.elapsed().as_millis() as u64,
407 })?;
408 Ok(())
409}
410
411fn vec_table_exists(conn: &rusqlite::Connection, name: &str) -> bool {
412 conn.query_row(
413 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
414 rusqlite::params![name],
415 |r| r.get::<_, i64>(0).map(|v| v > 0),
416 )
417 .unwrap_or(false)
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423 use rusqlite::Connection;
424
425 fn open_vec_test_db() -> Connection {
426 let conn = Connection::open_in_memory().unwrap();
427 conn.execute_batch(
428 "CREATE TABLE memories (
429 id INTEGER PRIMARY KEY,
430 deleted_at INTEGER
431 );
432 CREATE TABLE memory_embeddings (
433 memory_id INTEGER PRIMARY KEY,
434 namespace TEXT NOT NULL,
435 embedding BLOB NOT NULL,
436 source TEXT NOT NULL,
437 model TEXT NOT NULL,
438 dim INTEGER NOT NULL DEFAULT 384
439 );
440 CREATE TABLE vec_memories (
441 memory_id INTEGER PRIMARY KEY,
442 embedding BLOB NOT NULL,
443 created_at INTEGER NOT NULL DEFAULT 0
444 );
445 CREATE TABLE entity_embeddings (
446 entity_id INTEGER PRIMARY KEY,
447 namespace TEXT NOT NULL,
448 embedding BLOB NOT NULL,
449 source TEXT NOT NULL,
450 model TEXT NOT NULL,
451 dim INTEGER NOT NULL DEFAULT 384
452 );
453 CREATE TABLE vec_entities (
454 memory_id INTEGER PRIMARY KEY
455 );
456 CREATE TABLE chunk_embeddings (
457 chunk_id INTEGER PRIMARY KEY,
458 memory_id INTEGER NOT NULL,
459 embedding BLOB NOT NULL,
460 source TEXT NOT NULL,
461 model TEXT NOT NULL,
462 dim INTEGER NOT NULL DEFAULT 384
463 );
464 CREATE TABLE vec_chunks (
465 memory_id INTEGER PRIMARY KEY
466 );",
467 )
468 .unwrap();
469 conn
470 }
471
472 #[test]
473 fn vec_orphan_list_response_serializes_all_fields() {
474 let resp = VecOrphanListResponse {
475 action: "orphan_list".into(),
476 count: 0,
477 items: Vec::new(),
478 elapsed_ms: 5,
479 };
480 let v = serde_json::to_value(&resp).unwrap();
481 assert_eq!(v["action"], "orphan_list");
482 assert_eq!(v["count"], 0i64);
483 assert_eq!(v["elapsed_ms"], 5u64);
484 assert!(v["items"].is_array());
485 }
486
487 #[test]
488 fn vec_purge_orphan_response_serializes_dry_run_flag() {
489 let resp = VecPurgeOrphanResponse {
490 action: "purge_orphan_dry_run".into(),
491 deleted: 0,
492 deleted_entities: 0,
493 deleted_chunks: 0,
494 dry_run: true,
495 elapsed_ms: 1,
496 };
497 let v = serde_json::to_value(&resp).unwrap();
498 assert_eq!(v["dry_run"], true);
499 assert_eq!(v["deleted"], 0i64);
500 }
501
502 #[test]
503 fn vec_stats_response_computes_coverage() {
504 let resp = VecStatsResponse {
505 total_rows: 100,
506 orphaned: 25,
507 coverage_percent: 75.0,
508 vec_entities_rows: Some(50),
509 vec_chunks_rows: None,
510 fts_memories_rows: 100,
511 elapsed_ms: 10,
512 };
513 let v = serde_json::to_value(&resp).unwrap();
514 assert_eq!(v["coverage_percent"], 75.0);
515 assert_eq!(v["vec_entities_rows"], 50i64);
516 assert!(v.get("vec_chunks_rows").is_none());
517 }
518
519 #[test]
520 fn live_memory_embedding_stats_prefers_memory_embeddings() {
521 let conn = open_vec_test_db();
522 conn.execute("INSERT INTO memories (id, deleted_at) VALUES (1, NULL)", [])
523 .unwrap();
524 conn.execute("INSERT INTO memories (id, deleted_at) VALUES (2, 123)", [])
525 .unwrap();
526 conn.execute(
527 "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
528 VALUES (1, 'global', X'00', 'llm', 'm', 384)",
529 [],
530 )
531 .unwrap();
532 conn.execute(
533 "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
534 VALUES (2, 'global', X'00', 'llm', 'm', 384)",
535 [],
536 )
537 .unwrap();
538 conn.execute(
539 "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
540 VALUES (3, 'global', X'00', 'llm', 'm', 384)",
541 [],
542 )
543 .unwrap();
544 conn.execute(
545 "INSERT INTO vec_memories(memory_id, embedding, created_at) VALUES (99, X'00', 0)",
546 [],
547 )
548 .unwrap();
549
550 let (total, orphaned) = live_memory_embedding_stats(&conn);
551 assert_eq!(total, 3);
552 assert_eq!(orphaned, 2);
553 }
554
555 #[test]
556 fn count_rows_first_existing_prefers_new_embedding_tables() {
557 let conn = open_vec_test_db();
558 conn.execute(
559 "INSERT INTO entity_embeddings(entity_id, namespace, embedding, source, model, dim)
560 VALUES (1, 'global', X'00', 'llm', 'm', 384)",
561 [],
562 )
563 .unwrap();
564 conn.execute("INSERT INTO vec_entities(memory_id) VALUES (1)", [])
565 .unwrap();
566 conn.execute(
567 "INSERT INTO chunk_embeddings(chunk_id, memory_id, embedding, source, model, dim)
568 VALUES (1, 1, X'00', 'llm', 'm', 384)",
569 [],
570 )
571 .unwrap();
572 conn.execute("INSERT INTO vec_chunks(memory_id) VALUES (1)", [])
573 .unwrap();
574
575 assert_eq!(
576 count_rows_first_existing(&conn, &["entity_embeddings", "vec_entities"]),
577 Some(1)
578 );
579 assert_eq!(
580 count_rows_first_existing(&conn, &["chunk_embeddings", "vec_chunks"]),
581 Some(1)
582 );
583 }
584}