1use crate::errors::AppError;
16use crate::output;
17use crate::paths::AppPaths;
18use crate::storage::connection::{open_ro, open_rw};
19use serde::Serialize;
20
21const MEMORY_VEC_TABLES: &[&str] = &["memory_embeddings", "vec_memories"];
22
23#[derive(clap::Args)]
25#[command(
26 about = "Vector index maintenance (orphan detection, purge, stats)",
27 after_long_help = "EXAMPLES:\n \
28 # List orphan memory embedding rows whose memory_id is gone\n \
29 sqlite-graphrag vec orphan-list\n\n \
30 # Dry-run the purge (does not delete)\n \
31 sqlite-graphrag vec purge-orphan --dry-run\n\n \
32 # Actually purge orphans\n \
33 sqlite-graphrag vec purge-orphan --yes\n\n \
34 # Show stats for all vec0 tables\n \
35 sqlite-graphrag vec stats --json"
36)]
37pub struct VecArgs {
38 #[command(subcommand)]
39 pub command: VecSubcommand,
40}
41
42#[derive(clap::Subcommand)]
44pub enum VecSubcommand {
45 OrphanList(VecOrphanListArgs),
47 PurgeOrphan(VecPurgeOrphanArgs),
49 Stats(VecStatsArgs),
51}
52
53#[derive(clap::Args)]
55pub struct VecOrphanListArgs {
56 #[arg(long, hide = true)]
58 pub json: bool,
59 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61 pub db: Option<String>,
62}
63
64#[derive(clap::Args)]
66pub struct VecOrphanListInner {
67 pub json: bool,
68 pub db: Option<String>,
69}
70
71#[derive(clap::Args)]
73pub struct VecPurgeOrphanArgs {
74 #[arg(long, hide = true)]
76 pub json: bool,
77 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
79 pub db: Option<String>,
80 #[arg(long, default_value_t = false)]
82 pub yes: bool,
83 #[arg(long, default_value_t = false)]
85 pub dry_run: bool,
86}
87
88#[derive(clap::Args)]
90pub struct VecStatsArgs {
91 #[arg(long, hide = true)]
93 pub json: bool,
94 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96 pub db: Option<String>,
97}
98
99#[derive(Serialize)]
100struct VecOrphanListItem {
101 memory_id: i64,
103 vector_hash: String,
105 created_at: i64,
107}
108
109#[derive(Serialize)]
110struct VecOrphanListResponse {
111 action: String,
112 count: i64,
113 items: Vec<VecOrphanListItem>,
114 elapsed_ms: u64,
115}
116
117#[derive(Serialize)]
118struct VecPurgeOrphanResponse {
119 action: String,
120 deleted: i64,
121 deleted_entities: i64,
123 deleted_chunks: i64,
125 dry_run: bool,
126 elapsed_ms: u64,
127}
128
129#[derive(Serialize)]
130struct VecStatsResponse {
131 total_rows: i64,
132 orphaned: i64,
133 coverage_percent: f64,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 vec_entities_rows: Option<i64>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 vec_chunks_rows: Option<i64>,
138 fts_memories_rows: i64,
139 dims: Vec<DimBreakdown>,
142 elapsed_ms: u64,
143}
144
145#[derive(Serialize)]
146struct DimBreakdown {
147 table: String,
148 dim: i64,
149 rows: i64,
150}
151
152fn dim_breakdown(conn: &rusqlite::Connection) -> Vec<DimBreakdown> {
156 let mut out = Vec::new();
157 for table in ["memory_embeddings", "entity_embeddings", "chunk_embeddings"] {
158 if !vec_table_exists(conn, table) {
159 continue;
160 }
161 let sql = format!("SELECT dim, COUNT(*) FROM {table} GROUP BY dim ORDER BY dim");
162 let Ok(mut stmt) = conn.prepare(&sql) else {
163 continue;
164 };
165 let rows = stmt.query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)));
166 if let Ok(rows) = rows {
167 for (dim, count) in rows.flatten() {
168 out.push(DimBreakdown {
169 table: table.to_string(),
170 dim,
171 rows: count,
172 });
173 }
174 }
175 }
176 out
177}
178
179pub fn run(args: VecArgs) -> Result<(), AppError> {
184 match args.command {
185 VecSubcommand::OrphanList(a) => run_orphan_list(a),
186 VecSubcommand::PurgeOrphan(a) => run_purge_orphan(a),
187 VecSubcommand::Stats(a) => run_stats(a),
188 }
189}
190
191fn live_memory_embedding_stats(conn: &rusqlite::Connection) -> (i64, i64) {
192 if let Some(table_name) = first_existing_vec_table(conn, MEMORY_VEC_TABLES) {
193 let total = conn
194 .query_row(&format!("SELECT COUNT(*) FROM {table_name}"), [], |r| {
195 r.get(0)
196 })
197 .unwrap_or(0);
198 let orphaned = conn
199 .query_row(
200 &format!(
201 "SELECT COUNT(*)
202 FROM {table_name} v
203 LEFT JOIN memories m ON m.id = v.memory_id
204 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL"
205 ),
206 [],
207 |r| r.get(0),
208 )
209 .unwrap_or(0);
210 return (total, orphaned);
211 }
212
213 (0, 0)
214}
215
216fn first_existing_vec_table<'a>(
217 conn: &rusqlite::Connection,
218 candidates: &'a [&'a str],
219) -> Option<&'a str> {
220 candidates
221 .iter()
222 .copied()
223 .find(|table_name| vec_table_exists(conn, table_name))
224}
225
226fn count_rows_first_existing(conn: &rusqlite::Connection, candidates: &[&str]) -> Option<i64> {
227 for table in candidates {
228 if vec_table_exists(conn, table) {
229 return conn
230 .query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |r| r.get(0))
231 .ok();
232 }
233 }
234 None
235}
236
237fn run_orphan_list(args: VecOrphanListArgs) -> Result<(), AppError> {
238 let start = std::time::Instant::now();
239 let paths = AppPaths::resolve(args.db.as_deref())?;
240 crate::storage::connection::ensure_db_ready(&paths)?;
241 let conn = open_ro(&paths.db)?;
242
243 let Some(memory_table) = first_existing_vec_table(&conn, MEMORY_VEC_TABLES) else {
244 return output::emit_json(&VecOrphanListResponse {
245 action: "orphan_list".to_string(),
246 count: 0,
247 items: Vec::new(),
248 elapsed_ms: start.elapsed().as_millis() as u64,
249 });
250 };
251
252 let mut stmt = conn.prepare(&format!(
257 "SELECT v.memory_id, v.embedding, CAST(v.created_at AS INTEGER)
258 FROM {memory_table} v
259 LEFT JOIN memories m ON m.id = v.memory_id
260 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL
261 ORDER BY v.memory_id"
262 ))?;
263 let rows: Vec<VecOrphanListItem> = stmt
264 .query_map([], |r| {
265 let memory_id: i64 = r.get(0)?;
266 let blob: Vec<u8> = r.get(1)?;
267 let created_at: i64 = r.get(2)?;
268 let vector_hash = blake3::hash(&blob).to_hex().to_string();
269 Ok(VecOrphanListItem {
270 memory_id,
271 vector_hash,
272 created_at,
273 })
274 })?
275 .collect::<Result<Vec<_>, _>>()?;
276 let count = rows.len() as i64;
277
278 output::emit_json(&VecOrphanListResponse {
279 action: "orphan_list".to_string(),
280 count,
281 items: rows,
282 elapsed_ms: start.elapsed().as_millis() as u64,
283 })?;
284 Ok(())
285}
286
287fn run_purge_orphan(args: VecPurgeOrphanArgs) -> Result<(), AppError> {
288 let start = std::time::Instant::now();
289 let paths = AppPaths::resolve(args.db.as_deref())?;
290 crate::storage::connection::ensure_db_ready(&paths)?;
291 let conn = open_rw(&paths.db)?;
292
293 let Some(memory_table) = first_existing_vec_table(&conn, MEMORY_VEC_TABLES) else {
294 return output::emit_json(&VecPurgeOrphanResponse {
295 action: "purge_orphan".to_string(),
296 deleted: 0,
297 deleted_entities: 0,
298 deleted_chunks: 0,
299 dry_run: args.dry_run,
300 elapsed_ms: start.elapsed().as_millis() as u64,
301 });
302 };
303
304 let orphan_count: i64 = conn
305 .query_row(
306 &format!(
307 "SELECT COUNT(*) FROM {memory_table} v
308 LEFT JOIN memories m ON m.id = v.memory_id
309 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL"
310 ),
311 [],
312 |r| r.get(0),
313 )
314 .unwrap_or(0);
315
316 let orphan_entities_count: i64 = if vec_table_exists(&conn, "vec_entities") {
320 conn.query_row(
321 "SELECT COUNT(*) FROM vec_entities v
322 LEFT JOIN memories m ON m.id = v.memory_id
323 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL",
324 [],
325 |r| r.get(0),
326 )
327 .unwrap_or(0)
328 } else {
329 0
330 };
331 let orphan_chunks_count: i64 = if vec_table_exists(&conn, "vec_chunks") {
332 conn.query_row(
333 "SELECT COUNT(*) FROM vec_chunks v
334 LEFT JOIN memories m ON m.id = v.memory_id
335 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL",
336 [],
337 |r| r.get(0),
338 )
339 .unwrap_or(0)
340 } else {
341 0
342 };
343
344 if args.dry_run {
345 tracing::info!(target: "vec", orphan_count, orphan_entities_count, orphan_chunks_count, "dry-run: would delete orphans");
346 return output::emit_json(&VecPurgeOrphanResponse {
347 action: "purge_orphan_dry_run".to_string(),
348 deleted: 0,
349 deleted_entities: 0,
350 deleted_chunks: 0,
351 dry_run: true,
352 elapsed_ms: start.elapsed().as_millis() as u64,
353 });
354 }
355
356 if !args.yes {
357 return Err(AppError::Validation(format!(
358 "refusing to delete {orphan_count} memory embedding + {orphan_entities_count} vec_entities + {orphan_chunks_count} vec_chunks orphan rows without --yes (use --dry-run to preview)"
359 )));
360 }
361
362 let deleted: i64 = conn.execute(
363 &format!(
364 "DELETE FROM {memory_table}
365 WHERE NOT EXISTS (
366 SELECT 1 FROM memories m
367 WHERE m.id = {memory_table}.memory_id
368 AND m.deleted_at IS NULL
369 )"
370 ),
371 [],
372 )? as i64;
373
374 let deleted_entities: i64 = if vec_table_exists(&conn, "vec_entities") {
375 conn.execute(
376 "DELETE FROM vec_entities
377 WHERE NOT EXISTS (
378 SELECT 1 FROM memories m
379 WHERE m.id = vec_entities.memory_id
380 AND m.deleted_at IS NULL
381 )",
382 [],
383 )
384 .unwrap_or(0) as i64
385 } else {
386 0
387 };
388 let deleted_chunks: i64 = if vec_table_exists(&conn, "vec_chunks") {
389 conn.execute(
390 "DELETE FROM vec_chunks
391 WHERE NOT EXISTS (
392 SELECT 1 FROM memories m
393 WHERE m.id = vec_chunks.memory_id
394 AND m.deleted_at IS NULL
395 )",
396 [],
397 )
398 .unwrap_or(0) as i64
399 } else {
400 0
401 };
402
403 tracing::info!(target: "vec", deleted, deleted_entities, deleted_chunks, "purged orphan vec rows");
404
405 output::emit_json(&VecPurgeOrphanResponse {
406 action: "purged_orphan".to_string(),
407 deleted,
408 deleted_entities,
409 deleted_chunks,
410 dry_run: false,
411 elapsed_ms: start.elapsed().as_millis() as u64,
412 })?;
413 Ok(())
414}
415
416fn run_stats(args: VecStatsArgs) -> Result<(), AppError> {
417 let start = std::time::Instant::now();
418 let paths = AppPaths::resolve(args.db.as_deref())?;
419 crate::storage::connection::ensure_db_ready(&paths)?;
420 let conn = open_ro(&paths.db)?;
421
422 let (total_rows, orphaned) = live_memory_embedding_stats(&conn);
423 let coverage_percent = if total_rows > 0 {
424 ((total_rows - orphaned) as f64 / total_rows as f64) * 100.0
425 } else {
426 100.0
427 };
428
429 let vec_entities_rows =
430 count_rows_first_existing(&conn, &["entity_embeddings", "vec_entities"]);
431 let vec_chunks_rows = count_rows_first_existing(&conn, &["chunk_embeddings", "vec_chunks"]);
432 let fts_memories_rows = conn
433 .query_row("SELECT COUNT(*) FROM fts_memories", [], |r| r.get(0))
434 .unwrap_or(0);
435
436 output::emit_json(&VecStatsResponse {
437 total_rows,
438 orphaned,
439 coverage_percent,
440 vec_entities_rows,
441 vec_chunks_rows,
442 fts_memories_rows,
443 dims: dim_breakdown(&conn),
444 elapsed_ms: start.elapsed().as_millis() as u64,
445 })?;
446 Ok(())
447}
448
449fn vec_table_exists(conn: &rusqlite::Connection, name: &str) -> bool {
450 conn.query_row(
451 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
452 rusqlite::params![name],
453 |r| r.get::<_, i64>(0).map(|v| v > 0),
454 )
455 .unwrap_or(false)
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use rusqlite::Connection;
462
463 fn open_vec_test_db() -> Connection {
464 let conn = Connection::open_in_memory().unwrap();
465 conn.execute_batch(
466 "CREATE TABLE memories (
467 id INTEGER PRIMARY KEY,
468 deleted_at INTEGER
469 );
470 CREATE TABLE memory_embeddings (
471 memory_id INTEGER PRIMARY KEY,
472 namespace TEXT NOT NULL,
473 embedding BLOB NOT NULL,
474 source TEXT NOT NULL,
475 model TEXT NOT NULL,
476 dim INTEGER NOT NULL DEFAULT 384
477 );
478 CREATE TABLE vec_memories (
479 memory_id INTEGER PRIMARY KEY,
480 embedding BLOB NOT NULL,
481 created_at INTEGER NOT NULL DEFAULT 0
482 );
483 CREATE TABLE entity_embeddings (
484 entity_id INTEGER PRIMARY KEY,
485 namespace TEXT NOT NULL,
486 embedding BLOB NOT NULL,
487 source TEXT NOT NULL,
488 model TEXT NOT NULL,
489 dim INTEGER NOT NULL DEFAULT 384
490 );
491 CREATE TABLE vec_entities (
492 memory_id INTEGER PRIMARY KEY
493 );
494 CREATE TABLE chunk_embeddings (
495 chunk_id INTEGER PRIMARY KEY,
496 memory_id INTEGER NOT NULL,
497 embedding BLOB NOT NULL,
498 source TEXT NOT NULL,
499 model TEXT NOT NULL,
500 dim INTEGER NOT NULL DEFAULT 384
501 );
502 CREATE TABLE vec_chunks (
503 memory_id INTEGER PRIMARY KEY
504 );",
505 )
506 .unwrap();
507 conn
508 }
509
510 #[test]
511 fn vec_orphan_list_response_serializes_all_fields() {
512 let resp = VecOrphanListResponse {
513 action: "orphan_list".into(),
514 count: 0,
515 items: Vec::new(),
516 elapsed_ms: 5,
517 };
518 let v = serde_json::to_value(&resp).unwrap();
519 assert_eq!(v["action"], "orphan_list");
520 assert_eq!(v["count"], 0i64);
521 assert_eq!(v["elapsed_ms"], 5u64);
522 assert!(v["items"].is_array());
523 }
524
525 #[test]
526 fn vec_purge_orphan_response_serializes_dry_run_flag() {
527 let resp = VecPurgeOrphanResponse {
528 action: "purge_orphan_dry_run".into(),
529 deleted: 0,
530 deleted_entities: 0,
531 deleted_chunks: 0,
532 dry_run: true,
533 elapsed_ms: 1,
534 };
535 let v = serde_json::to_value(&resp).unwrap();
536 assert_eq!(v["dry_run"], true);
537 assert_eq!(v["deleted"], 0i64);
538 }
539
540 #[test]
541 fn vec_stats_response_computes_coverage() {
542 let resp = VecStatsResponse {
543 total_rows: 100,
544 orphaned: 25,
545 coverage_percent: 75.0,
546 vec_entities_rows: Some(50),
547 vec_chunks_rows: None,
548 fts_memories_rows: 100,
549 dims: vec![],
550 elapsed_ms: 10,
551 };
552 let v = serde_json::to_value(&resp).unwrap();
553 assert_eq!(v["coverage_percent"], 75.0);
554 assert_eq!(v["vec_entities_rows"], 50i64);
555 assert!(v.get("vec_chunks_rows").is_none());
556 assert!(v["dims"].as_array().unwrap().is_empty());
557 }
558
559 #[test]
560 fn dim_breakdown_groups_rows_per_dim_and_table() {
561 let conn = open_vec_test_db();
563 conn.execute_batch(
564 "INSERT INTO memories (id, deleted_at) VALUES (1, NULL), (2, NULL), (3, NULL);
565 INSERT INTO memory_embeddings (memory_id, namespace, embedding, source, model, dim)
566 VALUES (1, 'g', x'00', 'test', 'test', 64),
567 (2, 'g', x'00', 'test', 'test', 64),
568 (3, 'g', x'00', 'test', 'test', 384);",
569 )
570 .unwrap();
571 let dims = dim_breakdown(&conn);
572 let mem: Vec<_> = dims
573 .iter()
574 .filter(|d| d.table == "memory_embeddings")
575 .collect();
576 assert_eq!(mem.len(), 2, "expected one row per distinct dim");
577 assert_eq!((mem[0].dim, mem[0].rows), (64, 2));
578 assert_eq!((mem[1].dim, mem[1].rows), (384, 1));
579 }
580
581 #[test]
582 fn live_memory_embedding_stats_prefers_memory_embeddings() {
583 let conn = open_vec_test_db();
584 conn.execute("INSERT INTO memories (id, deleted_at) VALUES (1, NULL)", [])
585 .unwrap();
586 conn.execute("INSERT INTO memories (id, deleted_at) VALUES (2, 123)", [])
587 .unwrap();
588 conn.execute(
589 "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
590 VALUES (1, 'global', X'00', 'llm', 'm', 384)",
591 [],
592 )
593 .unwrap();
594 conn.execute(
595 "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
596 VALUES (2, 'global', X'00', 'llm', 'm', 384)",
597 [],
598 )
599 .unwrap();
600 conn.execute(
601 "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
602 VALUES (3, 'global', X'00', 'llm', 'm', 384)",
603 [],
604 )
605 .unwrap();
606 conn.execute(
607 "INSERT INTO vec_memories(memory_id, embedding, created_at) VALUES (99, X'00', 0)",
608 [],
609 )
610 .unwrap();
611
612 let (total, orphaned) = live_memory_embedding_stats(&conn);
613 assert_eq!(total, 3);
614 assert_eq!(orphaned, 2);
615 }
616
617 #[test]
618 fn count_rows_first_existing_prefers_new_embedding_tables() {
619 let conn = open_vec_test_db();
620 conn.execute(
621 "INSERT INTO entity_embeddings(entity_id, namespace, embedding, source, model, dim)
622 VALUES (1, 'global', X'00', 'llm', 'm', 384)",
623 [],
624 )
625 .unwrap();
626 conn.execute("INSERT INTO vec_entities(memory_id) VALUES (1)", [])
627 .unwrap();
628 conn.execute(
629 "INSERT INTO chunk_embeddings(chunk_id, memory_id, embedding, source, model, dim)
630 VALUES (1, 1, X'00', 'llm', 'm', 384)",
631 [],
632 )
633 .unwrap();
634 conn.execute("INSERT INTO vec_chunks(memory_id) VALUES (1)", [])
635 .unwrap();
636
637 assert_eq!(
638 count_rows_first_existing(&conn, &["entity_embeddings", "vec_entities"]),
639 Some(1)
640 );
641 assert_eq!(
642 count_rows_first_existing(&conn, &["chunk_embeddings", "vec_chunks"]),
643 Some(1)
644 );
645 }
646}