Skip to main content

sqlite_graphrag/commands/
embedding.rs

1//! GAP-005 (v1.0.82): `embedding` subcommand — health and retry of the
2//! pending-embeddings queue that buffers memories whose embedding step failed.
3//!
4//! ## Subcommands
5//! - `embedding status` — counts by status
6//! - `embedding list [--status <STATUS>]` — list pending entries
7//! - `embedding retry <pending_id>` — re-run embedding for one entry
8//! - `embedding abandon <pending_id>` — mark as abandoned
9//!
10//! The pending_embeddings table captures every `embed_with_fallback` failure
11//! with `exit_code`, `stderr_tail`, and `backend_chain` for diagnostics. This
12//! subcommand makes that state observable and recoverable.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n  \
26    # Show queue health and counts per status\n  \
27    sqlite-graphrag embedding status --json\n\n  \
28    # List all pending embeddings waiting for retry\n  \
29    sqlite-graphrag embedding list --status pending --json\n\n  \
30    # Mark pending_id 7 as abandoned (will not be retried automatically)\n  \
31    sqlite-graphrag embedding abandon 7 --yes\n\n  \
32    # Note: `embedding retry` requires re-running an LLM subprocess; for full\n  \
33    # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35    #[command(subcommand)]
36    pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41    /// Show queue health (counts by status).
42    Status(EmbeddingStatusArgs),
43    /// List pending embeddings filtered by status.
44    List(EmbeddingListArgs),
45    /// Mark one entry as abandoned.
46    Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {
51    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
52    pub db: Option<String>,
53    /// JSON output (always on; accepted for CLI consistency).
54    #[arg(long, hide = true)]
55    pub json: bool,
56}
57
58#[derive(Debug, Args)]
59pub struct EmbeddingListArgs {
60    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61    pub db: Option<String>,
62    /// Filter by status: pending | in_progress | done | abandoned. Default: pending.
63    #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
64    pub status: EmbeddingStatusFilter,
65    /// Maximum number of entries to return. Default: 100.
66    #[arg(long, default_value_t = 100)]
67    pub limit: usize,
68    /// JSON output (always on; accepted for CLI consistency).
69    #[arg(long, hide = true)]
70    pub json: bool,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
74#[value(rename_all = "snake_case")]
75pub enum EmbeddingStatusFilter {
76    Pending,
77    InProgress,
78    Done,
79    Abandoned,
80}
81
82impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
83    fn from(value: EmbeddingStatusFilter) -> Self {
84        match value {
85            EmbeddingStatusFilter::Pending => Self::Pending,
86            EmbeddingStatusFilter::InProgress => Self::InProgress,
87            EmbeddingStatusFilter::Done => Self::Done,
88            EmbeddingStatusFilter::Abandoned => Self::Abandoned,
89        }
90    }
91}
92
93#[derive(Debug, Args)]
94pub struct EmbeddingAbandonArgs {
95    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96    pub db: Option<String>,
97    /// Pending id to abandon.
98    pub pending_id: i64,
99    /// Skip the interactive confirmation prompt.
100    #[arg(long)]
101    pub yes: bool,
102    /// JSON output (always on; accepted for CLI consistency).
103    #[arg(long, hide = true)]
104    pub json: bool,
105}
106
107#[derive(Serialize)]
108struct EmbeddingStatusOutput {
109    action: &'static str,
110    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that would be
111    /// invoked to process live embeddings. `"claude" | "codex"
112    /// | "none" | "auto"`. `"auto"` indicates the caller requested Auto and
113    /// the codex→claude→none chain would be iterated at runtime.
114    backend_invoked: &'static str,
115    counts: EmbeddingStatusCounts,
116    /// GAP-SG-41: real vector coverage in the persisted tables. The `counts`
117    /// above only reflect the async retry queue (empty on the synchronous REST
118    /// path), so `coverage` reports the actual rows in `memory_embeddings`,
119    /// `entity_embeddings` and `chunk_embeddings` versus their source rows.
120    coverage: EmbeddingCoverage,
121    elapsed_ms: u64,
122}
123
124#[derive(Serialize, Default)]
125struct EmbeddingStatusCounts {
126    pending: usize,
127    in_progress: usize,
128    done: usize,
129    abandoned: usize,
130}
131
132/// GAP-SG-41: actual persisted-vector coverage. Each `*_with_vec` field counts
133/// the rows that have an embedding; the `*_total` field counts the source rows
134/// (active memories / entities / chunks). When totals are non-zero the operator
135/// can audit coverage directly instead of inferring it from `hybrid-search`.
136#[derive(Serialize, Default)]
137struct EmbeddingCoverage {
138    memories_total: i64,
139    memories_with_vec: i64,
140    /// v1.1.1 (P6b): active memories WITHOUT a row in `memory_embeddings`
141    /// (LEFT JOIN, so orphaned vectors never mask a gap). Additive field —
142    /// the pre-existing totals keep their meaning.
143    memories_missing: i64,
144    entities_total: i64,
145    entities_with_vec: i64,
146    /// v1.1.1 (P6b): entities without a row in `entity_embeddings`.
147    entities_missing: i64,
148    chunks_total: i64,
149    chunks_with_vec: i64,
150    /// v1.1.1 (P6b): memory_chunks rows without a row in `chunk_embeddings`.
151    chunks_missing: i64,
152}
153
154/// Counts a table, returning 0 when the table is absent (legacy DB) instead of
155/// failing the whole status report.
156fn count_table(conn: &rusqlite::Connection, sql: &str) -> i64 {
157    match conn.query_row(sql, [], |r| r.get::<_, i64>(0)) {
158        Ok(n) => n,
159        Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("no such table") => 0,
160        Err(e) => {
161            tracing::warn!(target: "embedding", error = %e, sql, "coverage count failed");
162            0
163        }
164    }
165}
166
167/// v1.1.1 (P6b): counts source rows without a vector via LEFT JOIN. When the
168/// embedding table does not exist (legacy DB) EVERY source row is missing, so
169/// the fallback is `total_when_absent` — never a silent 0 that would report
170/// full coverage on a table that is not there.
171fn count_missing(conn: &rusqlite::Connection, sql: &str, total_when_absent: i64) -> i64 {
172    match conn.query_row(sql, [], |r| r.get::<_, i64>(0)) {
173        Ok(n) => n,
174        Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("no such table") => {
175            total_when_absent
176        }
177        Err(e) => {
178            tracing::warn!(target: "embedding", error = %e, sql, "coverage missing-count failed");
179            0
180        }
181    }
182}
183
184#[derive(Serialize)]
185struct EmbeddingListEntry {
186    pending_id: i64,
187    memory_id: i64,
188    name: String,
189    namespace: String,
190    backend_chain: String,
191    last_error: Option<String>,
192    last_exit_code: Option<i32>,
193    last_stderr_tail: Option<String>,
194    attempt_count: i32,
195    status: String,
196    updated_at: i64,
197}
198
199impl From<&PendingEmbedding> for EmbeddingListEntry {
200    fn from(p: &PendingEmbedding) -> Self {
201        Self {
202            pending_id: p.pending_id,
203            memory_id: p.memory_id,
204            name: p.name.clone(),
205            namespace: p.namespace.clone(),
206            backend_chain: p.backend_chain.clone(),
207            last_error: p.last_error.clone(),
208            last_exit_code: p.last_exit_code,
209            last_stderr_tail: p.last_stderr_tail.clone(),
210            attempt_count: p.attempt_count,
211            status: p.status.as_str().to_string(),
212            updated_at: p.updated_at,
213        }
214    }
215}
216
217#[derive(Serialize)]
218struct EmbeddingListOutput {
219    action: &'static str,
220    filter_status: String,
221    count: usize,
222    entries: Vec<EmbeddingListEntry>,
223    elapsed_ms: u64,
224}
225
226#[derive(Serialize)]
227struct EmbeddingAbandonOutput {
228    action: &'static str,
229    pending_id: i64,
230    status: &'static str,
231    elapsed_ms: u64,
232    yes: bool,
233}
234
235pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
236    match args.cmd {
237        EmbeddingCmd::Status(a) => run_status(a, llm_backend),
238        EmbeddingCmd::List(a) => run_list(a),
239        EmbeddingCmd::Abandon(a) => run_abandon(a),
240    }
241}
242
243fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
244    let paths = AppPaths::resolve(db)?;
245    let conn = open_rw(&paths.db)?;
246    Ok((paths, conn))
247}
248
249fn run_status(args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
250    let start = std::time::Instant::now();
251    let (_paths, conn) = open_conn(args.db.as_deref())?;
252
253    let counts = EmbeddingStatusCounts {
254        pending: pending_embeddings::list_by_status(
255            &conn,
256            PendingEmbeddingStatus::Pending,
257            100_000,
258        )?
259        .len(),
260        in_progress: pending_embeddings::list_by_status(
261            &conn,
262            PendingEmbeddingStatus::InProgress,
263            100_000,
264        )?
265        .len(),
266        done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
267            .len(),
268        abandoned: pending_embeddings::list_by_status(
269            &conn,
270            PendingEmbeddingStatus::Abandoned,
271            100_000,
272        )?
273        .len(),
274    };
275
276    let backend_invoked: &'static str = match llm_backend {
277        LlmBackendChoice::Claude => "claude",
278        LlmBackendChoice::Codex => "codex",
279        LlmBackendChoice::Opencode => "opencode",
280        LlmBackendChoice::None => "none",
281        LlmBackendChoice::OpenRouter => "openrouter",
282        LlmBackendChoice::Auto => "auto",
283    };
284
285    // GAP-SG-41: query the actual vector tables so coverage is observable even
286    // when the async queue is empty (the synchronous OpenRouter REST path never
287    // populates `pending_embeddings`).
288    let memories_total = count_table(
289        &conn,
290        "SELECT COUNT(*) FROM memories WHERE deleted_at IS NULL",
291    );
292    let entities_total = count_table(&conn, "SELECT COUNT(*) FROM entities");
293    let chunks_total = count_table(&conn, "SELECT COUNT(*) FROM memory_chunks");
294    let coverage = EmbeddingCoverage {
295        memories_total,
296        memories_with_vec: count_table(&conn, "SELECT COUNT(*) FROM memory_embeddings"),
297        // v1.1.1 (P6b): missing counts via LEFT JOIN so orphaned vector rows
298        // never inflate coverage; absent embedding table means ALL missing.
299        memories_missing: count_missing(
300            &conn,
301            "SELECT COUNT(*) FROM memories m \
302             LEFT JOIN memory_embeddings me ON me.memory_id = m.id \
303             WHERE me.memory_id IS NULL AND m.deleted_at IS NULL",
304            memories_total,
305        ),
306        entities_total,
307        entities_with_vec: count_table(&conn, "SELECT COUNT(*) FROM entity_embeddings"),
308        entities_missing: count_missing(
309            &conn,
310            "SELECT COUNT(*) FROM entities e \
311             LEFT JOIN entity_embeddings ee ON ee.entity_id = e.id \
312             WHERE ee.entity_id IS NULL",
313            entities_total,
314        ),
315        chunks_total,
316        chunks_with_vec: count_table(&conn, "SELECT COUNT(*) FROM chunk_embeddings"),
317        chunks_missing: count_missing(
318            &conn,
319            "SELECT COUNT(*) FROM memory_chunks c \
320             LEFT JOIN chunk_embeddings ce ON ce.chunk_id = c.id \
321             WHERE ce.chunk_id IS NULL",
322            chunks_total,
323        ),
324    };
325
326    let output = EmbeddingStatusOutput {
327        action: "embedding_status",
328        backend_invoked,
329        counts,
330        coverage,
331        elapsed_ms: start.elapsed().as_millis() as u64,
332    };
333    emit_json_compact(&output)
334}
335
336fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
337    let start = std::time::Instant::now();
338    let (_paths, conn) = open_conn(args.db.as_deref())?;
339    let status: PendingEmbeddingStatus = args.status.into();
340    let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
341    let count = rows.len();
342    let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
343    let output = EmbeddingListOutput {
344        action: "embedding_list",
345        filter_status: status.as_str().to_string(),
346        count,
347        entries,
348        elapsed_ms: start.elapsed().as_millis() as u64,
349    };
350    emit_json_compact(&output)
351}
352
353fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
354    let start = std::time::Instant::now();
355    let (_paths, conn) = open_conn(args.db.as_deref())?;
356    pending_embeddings::abandon(&conn, args.pending_id)?;
357    let output = EmbeddingAbandonOutput {
358        action: "embedding_abandon",
359        pending_id: args.pending_id,
360        status: PendingEmbeddingStatus::Abandoned.as_str(),
361        elapsed_ms: start.elapsed().as_millis() as u64,
362        yes: args.yes,
363    };
364    emit_json_compact(&output)
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    // GAP-SG-41: the status output exposes real vector coverage, not only the
372    // async queue counts.
373    #[test]
374    fn embedding_status_output_includes_coverage() {
375        let output = EmbeddingStatusOutput {
376            action: "embedding_status",
377            backend_invoked: "openrouter",
378            counts: EmbeddingStatusCounts::default(),
379            coverage: EmbeddingCoverage {
380                memories_total: 10,
381                memories_with_vec: 9,
382                memories_missing: 1,
383                entities_total: 4,
384                entities_with_vec: 4,
385                entities_missing: 0,
386                chunks_total: 7,
387                chunks_with_vec: 7,
388                chunks_missing: 0,
389            },
390            elapsed_ms: 1,
391        };
392        let json = serde_json::to_value(&output).expect("serialize");
393        assert_eq!(json["coverage"]["memories_total"], 10);
394        assert_eq!(json["coverage"]["memories_with_vec"], 9);
395        assert_eq!(json["coverage"]["entities_with_vec"], 4);
396        assert_eq!(json["coverage"]["chunks_with_vec"], 7);
397        // v1.1.1 (P6b): the missing counters serialize alongside the totals.
398        assert_eq!(json["coverage"]["memories_missing"], 1);
399        assert_eq!(json["coverage"]["entities_missing"], 0);
400        assert_eq!(json["coverage"]["chunks_missing"], 0);
401    }
402
403    // v1.1.1 (P6b): the LEFT JOIN counts real gaps and the absent-table
404    // fallback reports EVERYTHING missing instead of a silent 0.
405    #[test]
406    fn count_missing_counts_gaps_and_falls_back_when_table_absent() {
407        let conn = rusqlite::Connection::open_in_memory().unwrap();
408        conn.execute_batch(
409            "CREATE TABLE entities (id INTEGER PRIMARY KEY, name TEXT);
410            CREATE TABLE entity_embeddings (
411                entity_id INTEGER PRIMARY KEY,
412                embedding BLOB NOT NULL
413            );",
414        )
415        .unwrap();
416        conn.execute(
417            "INSERT INTO entities (id, name) VALUES (1, 'a'), (2, 'b'), (3, 'c')",
418            [],
419        )
420        .unwrap();
421        conn.execute(
422            "INSERT INTO entity_embeddings (entity_id, embedding) VALUES (1, X'00')",
423            [],
424        )
425        .unwrap();
426
427        let missing = count_missing(
428            &conn,
429            "SELECT COUNT(*) FROM entities e \
430             LEFT JOIN entity_embeddings ee ON ee.entity_id = e.id \
431             WHERE ee.entity_id IS NULL",
432            3,
433        );
434        assert_eq!(missing, 2, "2 of 3 entities lack a vector row");
435
436        // Absent embedding table: everything counts as missing.
437        let missing_absent = count_missing(
438            &conn,
439            "SELECT COUNT(*) FROM entities e \
440             LEFT JOIN chunk_embeddings ce ON ce.chunk_id = e.id \
441             WHERE ce.chunk_id IS NULL",
442            3,
443        );
444        assert_eq!(missing_absent, 3, "absent table must report all missing");
445    }
446
447    #[test]
448    fn status_filter_round_trip() {
449        for f in [
450            EmbeddingStatusFilter::Pending,
451            EmbeddingStatusFilter::InProgress,
452            EmbeddingStatusFilter::Done,
453            EmbeddingStatusFilter::Abandoned,
454        ] {
455            let s: PendingEmbeddingStatus = f.into();
456            assert_eq!(
457                s.as_str(),
458                match f {
459                    EmbeddingStatusFilter::Pending => "pending",
460                    EmbeddingStatusFilter::InProgress => "in_progress",
461                    EmbeddingStatusFilter::Done => "done",
462                    EmbeddingStatusFilter::Abandoned => "abandoned",
463                }
464            );
465        }
466    }
467}