Skip to main content

sqlite_graphrag/commands/
vec.rs

1//! Handler for the `vec` CLI subcommand family.
2//!
3//! Provides maintenance operations for the memory embedding store,
4//! preferring `memory_embeddings` and falling back to legacy `vec_memories`:
5//!
6//! - `orphan-list`: lists embedding rows whose `memory_id` no longer
7//!   references a live (non-soft-deleted) memory.
8//! - `purge-orphan`: deletes those orphan rows in a single transaction.
9//! - `stats`: surfaces total rows, orphan count, and coverage percentage.
10//!
11//! G39 (v1.0.69): before v1.0.69, the only way to detect a vec-orphan was
12//! `health --json` which reported `vec_memories_orphaned > 0` with no
13//! remediation path. This module closes the loop.
14
15use crate::errors::AppError;
16use crate::output;
17use crate::paths::AppPaths;
18use crate::storage::connection::{open_ro, open_rw};
19use serde::Serialize;
20
21const MEMORY_VEC_TABLES: &[&str] = &["memory_embeddings", "vec_memories"];
22
23/// Arguments for the `vec` subcommand family.
24#[derive(clap::Args)]
25#[command(
26    about = "Vector index maintenance (orphan detection, purge, stats)",
27    after_long_help = "EXAMPLES:\n  \
28        # List orphan memory embedding rows whose memory_id is gone\n  \
29        sqlite-graphrag vec orphan-list\n\n  \
30        # Dry-run the purge (does not delete)\n  \
31        sqlite-graphrag vec purge-orphan --dry-run\n\n  \
32        # Actually purge orphans\n  \
33        sqlite-graphrag vec purge-orphan --yes\n\n  \
34        # Show stats for all vec0 tables\n  \
35        sqlite-graphrag vec stats --json"
36)]
37pub struct VecArgs {
38    #[command(subcommand)]
39    pub command: VecSubcommand,
40}
41
42/// Subcommands nested under `vec`.
43#[derive(clap::Subcommand)]
44pub enum VecSubcommand {
45    /// List orphan memory embedding rows.
46    OrphanList(VecOrphanListArgs),
47    /// Delete orphan memory embedding rows. Requires `--yes` to confirm.
48    PurgeOrphan(VecPurgeOrphanArgs),
49    /// Show statistics for vec_memories, vec_entities, vec_chunks.
50    Stats(VecStatsArgs),
51}
52
53/// Arguments for `vec orphan-list`.
54#[derive(clap::Args)]
55pub struct VecOrphanListArgs {
56    /// No-op; JSON is always emitted on stdout.
57    #[arg(long, hide = true)]
58    pub json: bool,
59    /// Path to the SQLite database file.
60    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61    pub db: Option<String>,
62}
63
64/// Arguments for `vec purge-orphan`.
65#[derive(clap::Args)]
66pub struct VecOrphanListInner {
67    pub json: bool,
68    pub db: Option<String>,
69}
70
71/// Arguments for `vec purge-orphan`.
72#[derive(clap::Args)]
73pub struct VecPurgeOrphanArgs {
74    /// No-op; JSON is always emitted on stdout.
75    #[arg(long, hide = true)]
76    pub json: bool,
77    /// Path to the SQLite database file.
78    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
79    pub db: Option<String>,
80    /// Skip the interactive confirmation; required for automation.
81    #[arg(long, default_value_t = false)]
82    pub yes: bool,
83    /// Report what would be purged without writing.
84    #[arg(long, default_value_t = false)]
85    pub dry_run: bool,
86}
87
88/// Arguments for `vec stats`.
89#[derive(clap::Args)]
90pub struct VecStatsArgs {
91    /// No-op; JSON is always emitted on stdout.
92    #[arg(long, hide = true)]
93    pub json: bool,
94    /// Path to the SQLite database file.
95    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96    pub db: Option<String>,
97}
98
99#[derive(Serialize)]
100struct VecOrphanListItem {
101    /// The orphan `memory_id` value stored in the active memory embedding table.
102    memory_id: i64,
103    /// Hash of the float vector blob, for fingerprinting.
104    vector_hash: String,
105    /// When the orphan row was originally inserted.
106    created_at: i64,
107}
108
109#[derive(Serialize)]
110struct VecOrphanListResponse {
111    action: String,
112    count: i64,
113    items: Vec<VecOrphanListItem>,
114    elapsed_ms: u64,
115}
116
117#[derive(Serialize)]
118struct VecPurgeOrphanResponse {
119    action: String,
120    deleted: i64,
121    /// Number of orphan rows in `vec_entities` that were also removed (G39).
122    deleted_entities: i64,
123    /// Number of orphan rows in `vec_chunks` that were also removed (G39).
124    deleted_chunks: i64,
125    dry_run: bool,
126    elapsed_ms: u64,
127}
128
129#[derive(Serialize)]
130struct VecStatsResponse {
131    total_rows: i64,
132    orphaned: i64,
133    coverage_percent: f64,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    vec_entities_rows: Option<i64>,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    vec_chunks_rows: Option<i64>,
138    fts_memories_rows: i64,
139    /// G52: per-dimensionality row counts across the three embedding tables.
140    /// Surfaces mixed-dim contamination (G43) without manual SQL.
141    dims: Vec<DimBreakdown>,
142    elapsed_ms: u64,
143}
144
145#[derive(Serialize)]
146struct DimBreakdown {
147    table: String,
148    dim: i64,
149    rows: i64,
150}
151
152/// G52: aggregates `SELECT dim, COUNT(*) ... GROUP BY dim` over each
153/// embedding table that exists. Mixed dimensionalities in the same table
154/// indicate G43-style contamination that blinds cosine similarity.
155fn dim_breakdown(conn: &rusqlite::Connection) -> Vec<DimBreakdown> {
156    let mut out = Vec::new();
157    for table in ["memory_embeddings", "entity_embeddings", "chunk_embeddings"] {
158        if !vec_table_exists(conn, table) {
159            continue;
160        }
161        let sql = format!("SELECT dim, COUNT(*) FROM {table} GROUP BY dim ORDER BY dim");
162        let Ok(mut stmt) = conn.prepare(&sql) else {
163            continue;
164        };
165        let rows = stmt.query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)));
166        if let Ok(rows) = rows {
167            for (dim, count) in rows.flatten() {
168                out.push(DimBreakdown {
169                    table: table.to_string(),
170                    dim,
171                    rows: count,
172                });
173            }
174        }
175    }
176    out
177}
178
179/// Dispatch entry point called from `main`.
180///
181/// # Errors
182/// Propagates any [`AppError`] raised by the underlying subcommand.
183pub fn run(args: VecArgs) -> Result<(), AppError> {
184    match args.command {
185        VecSubcommand::OrphanList(a) => run_orphan_list(a),
186        VecSubcommand::PurgeOrphan(a) => run_purge_orphan(a),
187        VecSubcommand::Stats(a) => run_stats(a),
188    }
189}
190
191fn live_memory_embedding_stats(conn: &rusqlite::Connection) -> (i64, i64) {
192    if let Some(table_name) = first_existing_vec_table(conn, MEMORY_VEC_TABLES) {
193        let total = conn
194            .query_row(&format!("SELECT COUNT(*) FROM {table_name}"), [], |r| {
195                r.get(0)
196            })
197            .unwrap_or(0);
198        let orphaned = conn
199            .query_row(
200                &format!(
201                    "SELECT COUNT(*)
202                     FROM {table_name} v
203                     LEFT JOIN memories m ON m.id = v.memory_id
204                     WHERE m.id IS NULL OR m.deleted_at IS NOT NULL"
205                ),
206                [],
207                |r| r.get(0),
208            )
209            .unwrap_or(0);
210        return (total, orphaned);
211    }
212
213    (0, 0)
214}
215
216fn first_existing_vec_table<'a>(
217    conn: &rusqlite::Connection,
218    candidates: &'a [&'a str],
219) -> Option<&'a str> {
220    candidates
221        .iter()
222        .copied()
223        .find(|table_name| vec_table_exists(conn, table_name))
224}
225
226fn count_rows_first_existing(conn: &rusqlite::Connection, candidates: &[&str]) -> Option<i64> {
227    for table in candidates {
228        if vec_table_exists(conn, table) {
229            return conn
230                .query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |r| r.get(0))
231                .ok();
232        }
233    }
234    None
235}
236
237fn run_orphan_list(args: VecOrphanListArgs) -> Result<(), AppError> {
238    let start = std::time::Instant::now();
239    let paths = AppPaths::resolve(args.db.as_deref())?;
240    crate::storage::connection::ensure_db_ready(&paths)?;
241    let conn = open_ro(&paths.db)?;
242
243    let Some(memory_table) = first_existing_vec_table(&conn, MEMORY_VEC_TABLES) else {
244        return output::emit_json(&VecOrphanListResponse {
245            action: "orphan_list".to_string(),
246            count: 0,
247            items: Vec::new(),
248            elapsed_ms: start.elapsed().as_millis() as u64,
249        });
250    };
251
252    // List embedding rows that have no corresponding live memory row.
253    // We use a hash of the float[] blob (BLAKE3) as a fingerprint so the
254    // operator can detect duplicate embeddings even after the parent
255    // memory has been re-embedded with new content.
256    let mut stmt = conn.prepare(&format!(
257        "SELECT v.memory_id, v.embedding, CAST(v.created_at AS INTEGER)
258         FROM {memory_table} v
259         LEFT JOIN memories m ON m.id = v.memory_id
260         WHERE m.id IS NULL OR m.deleted_at IS NOT NULL
261         ORDER BY v.memory_id"
262    ))?;
263    let rows: Vec<VecOrphanListItem> = stmt
264        .query_map([], |r| {
265            let memory_id: i64 = r.get(0)?;
266            let blob: Vec<u8> = r.get(1)?;
267            let created_at: i64 = r.get(2)?;
268            let vector_hash = blake3::hash(&blob).to_hex().to_string();
269            Ok(VecOrphanListItem {
270                memory_id,
271                vector_hash,
272                created_at,
273            })
274        })?
275        .collect::<Result<Vec<_>, _>>()?;
276    let count = rows.len() as i64;
277
278    output::emit_json(&VecOrphanListResponse {
279        action: "orphan_list".to_string(),
280        count,
281        items: rows,
282        elapsed_ms: start.elapsed().as_millis() as u64,
283    })?;
284    Ok(())
285}
286
287fn run_purge_orphan(args: VecPurgeOrphanArgs) -> Result<(), AppError> {
288    let start = std::time::Instant::now();
289    let paths = AppPaths::resolve(args.db.as_deref())?;
290    crate::storage::connection::ensure_db_ready(&paths)?;
291    let conn = open_rw(&paths.db)?;
292
293    let Some(memory_table) = first_existing_vec_table(&conn, MEMORY_VEC_TABLES) else {
294        return output::emit_json(&VecPurgeOrphanResponse {
295            action: "purge_orphan".to_string(),
296            deleted: 0,
297            deleted_entities: 0,
298            deleted_chunks: 0,
299            dry_run: args.dry_run,
300            elapsed_ms: start.elapsed().as_millis() as u64,
301        });
302    };
303
304    let orphan_count: i64 = conn
305        .query_row(
306            &format!(
307                "SELECT COUNT(*) FROM {memory_table} v
308                 LEFT JOIN memories m ON m.id = v.memory_id
309                 WHERE m.id IS NULL OR m.deleted_at IS NOT NULL"
310            ),
311            [],
312            |r| r.get(0),
313        )
314        .unwrap_or(0);
315
316    // G39: also count orphans in vec_entities and vec_chunks. These
317    // tables follow the same `memory_id` foreign key convention and
318    // accumulate orphans on the same paths as vec_memories.
319    let orphan_entities_count: i64 = if vec_table_exists(&conn, "vec_entities") {
320        conn.query_row(
321            "SELECT COUNT(*) FROM vec_entities v
322             LEFT JOIN memories m ON m.id = v.memory_id
323             WHERE m.id IS NULL OR m.deleted_at IS NOT NULL",
324            [],
325            |r| r.get(0),
326        )
327        .unwrap_or(0)
328    } else {
329        0
330    };
331    let orphan_chunks_count: i64 = if vec_table_exists(&conn, "vec_chunks") {
332        conn.query_row(
333            "SELECT COUNT(*) FROM vec_chunks v
334             LEFT JOIN memories m ON m.id = v.memory_id
335             WHERE m.id IS NULL OR m.deleted_at IS NOT NULL",
336            [],
337            |r| r.get(0),
338        )
339        .unwrap_or(0)
340    } else {
341        0
342    };
343
344    if args.dry_run {
345        tracing::info!(target: "vec", orphan_count, orphan_entities_count, orphan_chunks_count, "dry-run: would delete orphans");
346        return output::emit_json(&VecPurgeOrphanResponse {
347            action: "purge_orphan_dry_run".to_string(),
348            deleted: 0,
349            deleted_entities: 0,
350            deleted_chunks: 0,
351            dry_run: true,
352            elapsed_ms: start.elapsed().as_millis() as u64,
353        });
354    }
355
356    if !args.yes {
357        return Err(AppError::Validation(format!(
358            "refusing to delete {orphan_count} memory embedding + {orphan_entities_count} vec_entities + {orphan_chunks_count} vec_chunks orphan rows without --yes (use --dry-run to preview)"
359        )));
360    }
361
362    let deleted: i64 = conn.execute(
363        &format!(
364            "DELETE FROM {memory_table}
365             WHERE NOT EXISTS (
366                 SELECT 1 FROM memories m
367                 WHERE m.id = {memory_table}.memory_id
368                   AND m.deleted_at IS NULL
369             )"
370        ),
371        [],
372    )? as i64;
373
374    let deleted_entities: i64 = if vec_table_exists(&conn, "vec_entities") {
375        conn.execute(
376            "DELETE FROM vec_entities
377             WHERE NOT EXISTS (
378                 SELECT 1 FROM memories m
379                 WHERE m.id = vec_entities.memory_id
380                   AND m.deleted_at IS NULL
381             )",
382            [],
383        )
384        .unwrap_or(0) as i64
385    } else {
386        0
387    };
388    let deleted_chunks: i64 = if vec_table_exists(&conn, "vec_chunks") {
389        conn.execute(
390            "DELETE FROM vec_chunks
391             WHERE NOT EXISTS (
392                 SELECT 1 FROM memories m
393                 WHERE m.id = vec_chunks.memory_id
394                   AND m.deleted_at IS NULL
395             )",
396            [],
397        )
398        .unwrap_or(0) as i64
399    } else {
400        0
401    };
402
403    tracing::info!(target: "vec", deleted, deleted_entities, deleted_chunks, "purged orphan vec rows");
404
405    output::emit_json(&VecPurgeOrphanResponse {
406        action: "purged_orphan".to_string(),
407        deleted,
408        deleted_entities,
409        deleted_chunks,
410        dry_run: false,
411        elapsed_ms: start.elapsed().as_millis() as u64,
412    })?;
413    Ok(())
414}
415
416fn run_stats(args: VecStatsArgs) -> Result<(), AppError> {
417    let start = std::time::Instant::now();
418    let paths = AppPaths::resolve(args.db.as_deref())?;
419    crate::storage::connection::ensure_db_ready(&paths)?;
420    let conn = open_ro(&paths.db)?;
421
422    let (total_rows, orphaned) = live_memory_embedding_stats(&conn);
423    let coverage_percent = if total_rows > 0 {
424        ((total_rows - orphaned) as f64 / total_rows as f64) * 100.0
425    } else {
426        100.0
427    };
428
429    let vec_entities_rows =
430        count_rows_first_existing(&conn, &["entity_embeddings", "vec_entities"]);
431    let vec_chunks_rows = count_rows_first_existing(&conn, &["chunk_embeddings", "vec_chunks"]);
432    let fts_memories_rows = conn
433        .query_row("SELECT COUNT(*) FROM fts_memories", [], |r| r.get(0))
434        .unwrap_or(0);
435
436    output::emit_json(&VecStatsResponse {
437        total_rows,
438        orphaned,
439        coverage_percent,
440        vec_entities_rows,
441        vec_chunks_rows,
442        fts_memories_rows,
443        dims: dim_breakdown(&conn),
444        elapsed_ms: start.elapsed().as_millis() as u64,
445    })?;
446    Ok(())
447}
448
449fn vec_table_exists(conn: &rusqlite::Connection, name: &str) -> bool {
450    conn.query_row(
451        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
452        rusqlite::params![name],
453        |r| r.get::<_, i64>(0).map(|v| v > 0),
454    )
455    .unwrap_or(false)
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461    use rusqlite::Connection;
462
463    fn open_vec_test_db() -> Connection {
464        let conn = Connection::open_in_memory().unwrap();
465        conn.execute_batch(
466            "CREATE TABLE memories (
467                id INTEGER PRIMARY KEY,
468                deleted_at INTEGER
469            );
470            CREATE TABLE memory_embeddings (
471                memory_id INTEGER PRIMARY KEY,
472                namespace TEXT NOT NULL,
473                embedding BLOB NOT NULL,
474                source TEXT NOT NULL,
475                model TEXT NOT NULL,
476                dim INTEGER NOT NULL DEFAULT 384
477            );
478            CREATE TABLE vec_memories (
479                memory_id INTEGER PRIMARY KEY,
480                embedding BLOB NOT NULL,
481                created_at INTEGER NOT NULL DEFAULT 0
482            );
483            CREATE TABLE entity_embeddings (
484                entity_id INTEGER PRIMARY KEY,
485                namespace TEXT NOT NULL,
486                embedding BLOB NOT NULL,
487                source TEXT NOT NULL,
488                model TEXT NOT NULL,
489                dim INTEGER NOT NULL DEFAULT 384
490            );
491            CREATE TABLE vec_entities (
492                memory_id INTEGER PRIMARY KEY
493            );
494            CREATE TABLE chunk_embeddings (
495                chunk_id INTEGER PRIMARY KEY,
496                memory_id INTEGER NOT NULL,
497                embedding BLOB NOT NULL,
498                source TEXT NOT NULL,
499                model TEXT NOT NULL,
500                dim INTEGER NOT NULL DEFAULT 384
501            );
502            CREATE TABLE vec_chunks (
503                memory_id INTEGER PRIMARY KEY
504            );",
505        )
506        .unwrap();
507        conn
508    }
509
510    #[test]
511    fn vec_orphan_list_response_serializes_all_fields() {
512        let resp = VecOrphanListResponse {
513            action: "orphan_list".into(),
514            count: 0,
515            items: Vec::new(),
516            elapsed_ms: 5,
517        };
518        let v = serde_json::to_value(&resp).unwrap();
519        assert_eq!(v["action"], "orphan_list");
520        assert_eq!(v["count"], 0i64);
521        assert_eq!(v["elapsed_ms"], 5u64);
522        assert!(v["items"].is_array());
523    }
524
525    #[test]
526    fn vec_purge_orphan_response_serializes_dry_run_flag() {
527        let resp = VecPurgeOrphanResponse {
528            action: "purge_orphan_dry_run".into(),
529            deleted: 0,
530            deleted_entities: 0,
531            deleted_chunks: 0,
532            dry_run: true,
533            elapsed_ms: 1,
534        };
535        let v = serde_json::to_value(&resp).unwrap();
536        assert_eq!(v["dry_run"], true);
537        assert_eq!(v["deleted"], 0i64);
538    }
539
540    #[test]
541    fn vec_stats_response_computes_coverage() {
542        let resp = VecStatsResponse {
543            total_rows: 100,
544            orphaned: 25,
545            coverage_percent: 75.0,
546            vec_entities_rows: Some(50),
547            vec_chunks_rows: None,
548            fts_memories_rows: 100,
549            dims: vec![],
550            elapsed_ms: 10,
551        };
552        let v = serde_json::to_value(&resp).unwrap();
553        assert_eq!(v["coverage_percent"], 75.0);
554        assert_eq!(v["vec_entities_rows"], 50i64);
555        assert!(v.get("vec_chunks_rows").is_none());
556        assert!(v["dims"].as_array().unwrap().is_empty());
557    }
558
559    #[test]
560    fn dim_breakdown_groups_rows_per_dim_and_table() {
561        // G52: mixed dims in the same table must surface as separate rows.
562        let conn = open_vec_test_db();
563        conn.execute_batch(
564            "INSERT INTO memories (id, deleted_at) VALUES (1, NULL), (2, NULL), (3, NULL);
565             INSERT INTO memory_embeddings (memory_id, namespace, embedding, source, model, dim)
566             VALUES (1, 'g', x'00', 'test', 'test', 64),
567                    (2, 'g', x'00', 'test', 'test', 64),
568                    (3, 'g', x'00', 'test', 'test', 384);",
569        )
570        .unwrap();
571        let dims = dim_breakdown(&conn);
572        let mem: Vec<_> = dims
573            .iter()
574            .filter(|d| d.table == "memory_embeddings")
575            .collect();
576        assert_eq!(mem.len(), 2, "expected one row per distinct dim");
577        assert_eq!((mem[0].dim, mem[0].rows), (64, 2));
578        assert_eq!((mem[1].dim, mem[1].rows), (384, 1));
579    }
580
581    #[test]
582    fn live_memory_embedding_stats_prefers_memory_embeddings() {
583        let conn = open_vec_test_db();
584        conn.execute("INSERT INTO memories (id, deleted_at) VALUES (1, NULL)", [])
585            .unwrap();
586        conn.execute("INSERT INTO memories (id, deleted_at) VALUES (2, 123)", [])
587            .unwrap();
588        conn.execute(
589            "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
590             VALUES (1, 'global', X'00', 'llm', 'm', 384)",
591            [],
592        )
593        .unwrap();
594        conn.execute(
595            "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
596             VALUES (2, 'global', X'00', 'llm', 'm', 384)",
597            [],
598        )
599        .unwrap();
600        conn.execute(
601            "INSERT INTO memory_embeddings(memory_id, namespace, embedding, source, model, dim)
602             VALUES (3, 'global', X'00', 'llm', 'm', 384)",
603            [],
604        )
605        .unwrap();
606        conn.execute(
607            "INSERT INTO vec_memories(memory_id, embedding, created_at) VALUES (99, X'00', 0)",
608            [],
609        )
610        .unwrap();
611
612        let (total, orphaned) = live_memory_embedding_stats(&conn);
613        assert_eq!(total, 3);
614        assert_eq!(orphaned, 2);
615    }
616
617    #[test]
618    fn count_rows_first_existing_prefers_new_embedding_tables() {
619        let conn = open_vec_test_db();
620        conn.execute(
621            "INSERT INTO entity_embeddings(entity_id, namespace, embedding, source, model, dim)
622             VALUES (1, 'global', X'00', 'llm', 'm', 384)",
623            [],
624        )
625        .unwrap();
626        conn.execute("INSERT INTO vec_entities(memory_id) VALUES (1)", [])
627            .unwrap();
628        conn.execute(
629            "INSERT INTO chunk_embeddings(chunk_id, memory_id, embedding, source, model, dim)
630             VALUES (1, 1, X'00', 'llm', 'm', 384)",
631            [],
632        )
633        .unwrap();
634        conn.execute("INSERT INTO vec_chunks(memory_id) VALUES (1)", [])
635            .unwrap();
636
637        assert_eq!(
638            count_rows_first_existing(&conn, &["entity_embeddings", "vec_entities"]),
639            Some(1)
640        );
641        assert_eq!(
642            count_rows_first_existing(&conn, &["chunk_embeddings", "vec_chunks"]),
643            Some(1)
644        );
645    }
646}