Skip to main content

sqlite_graphrag/commands/
embedding.rs

1//! GAP-005 (v1.0.82): `embedding` subcommand — health and retry of the
2//! pending-embeddings queue that buffers memories whose embedding step failed.
3//!
4//! ## Subcommands
5//! - `embedding status` — counts by status
6//! - `embedding list [--status <STATUS>]` — list pending entries
7//! - `embedding retry <pending_id>` — re-run embedding for one entry
8//! - `embedding abandon <pending_id>` — mark as abandoned
9//!
10//! The pending_embeddings table captures every `embed_with_fallback` failure
11//! with `exit_code`, `stderr_tail`, and `backend_chain` for diagnostics. This
12//! subcommand makes that state observable and recoverable.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n  \
26    # Show queue health and counts per status\n  \
27    sqlite-graphrag embedding status --json\n\n  \
28    # List all pending embeddings waiting for retry\n  \
29    sqlite-graphrag embedding list --status pending --json\n\n  \
30    # Mark pending_id 7 as abandoned (will not be retried automatically)\n  \
31    sqlite-graphrag embedding abandon 7 --yes\n\n  \
32    # Note: `embedding retry` requires re-running an LLM subprocess; for full\n  \
33    # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35    #[command(subcommand)]
36    pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41    /// Show queue health (counts by status).
42    Status(EmbeddingStatusArgs),
43    /// List pending embeddings filtered by status.
44    List(EmbeddingListArgs),
45    /// Mark one entry as abandoned.
46    Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {
51    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
52    pub db: Option<String>,
53    /// JSON output (always on; accepted for CLI consistency).
54    #[arg(long, hide = true)]
55    pub json: bool,
56}
57
58#[derive(Debug, Args)]
59pub struct EmbeddingListArgs {
60    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61    pub db: Option<String>,
62    /// Filter by status: pending | in_progress | done | abandoned. Default: pending.
63    #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
64    pub status: EmbeddingStatusFilter,
65    /// Maximum number of entries to return. Default: 100.
66    #[arg(long, default_value_t = 100)]
67    pub limit: usize,
68    /// JSON output (always on; accepted for CLI consistency).
69    #[arg(long, hide = true)]
70    pub json: bool,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
74#[value(rename_all = "snake_case")]
75pub enum EmbeddingStatusFilter {
76    Pending,
77    InProgress,
78    Done,
79    Abandoned,
80}
81
82impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
83    fn from(value: EmbeddingStatusFilter) -> Self {
84        match value {
85            EmbeddingStatusFilter::Pending => Self::Pending,
86            EmbeddingStatusFilter::InProgress => Self::InProgress,
87            EmbeddingStatusFilter::Done => Self::Done,
88            EmbeddingStatusFilter::Abandoned => Self::Abandoned,
89        }
90    }
91}
92
93#[derive(Debug, Args)]
94pub struct EmbeddingAbandonArgs {
95    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96    pub db: Option<String>,
97    /// Pending id to abandon.
98    pub pending_id: i64,
99    /// Skip the interactive confirmation prompt.
100    #[arg(long)]
101    pub yes: bool,
102    /// JSON output (always on; accepted for CLI consistency).
103    #[arg(long, hide = true)]
104    pub json: bool,
105}
106
107#[derive(Serialize)]
108struct EmbeddingStatusOutput {
109    action: &'static str,
110    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that would be
111    /// invoked to process live embeddings. `"claude" | "codex"
112    /// | "none" | "auto"`. `"auto"` indicates the caller requested Auto and
113    /// the codex→claude→none chain would be iterated at runtime.
114    backend_invoked: &'static str,
115    counts: EmbeddingStatusCounts,
116    /// GAP-SG-41: real vector coverage in the persisted tables. The `counts`
117    /// above only reflect the async retry queue (empty on the synchronous REST
118    /// path), so `coverage` reports the actual rows in `memory_embeddings`,
119    /// `entity_embeddings` and `chunk_embeddings` versus their source rows.
120    coverage: EmbeddingCoverage,
121    elapsed_ms: u64,
122}
123
124#[derive(Serialize, Default)]
125struct EmbeddingStatusCounts {
126    pending: usize,
127    in_progress: usize,
128    done: usize,
129    abandoned: usize,
130}
131
132/// GAP-SG-41: actual persisted-vector coverage. Each `*_with_vec` field counts
133/// the rows that have an embedding; the `*_total` field counts the source rows
134/// (active memories / entities / chunks). When totals are non-zero the operator
135/// can audit coverage directly instead of inferring it from `hybrid-search`.
136#[derive(Serialize, Default)]
137struct EmbeddingCoverage {
138    memories_total: i64,
139    memories_with_vec: i64,
140    entities_total: i64,
141    entities_with_vec: i64,
142    chunks_total: i64,
143    chunks_with_vec: i64,
144}
145
146/// Counts a table, returning 0 when the table is absent (legacy DB) instead of
147/// failing the whole status report.
148fn count_table(conn: &rusqlite::Connection, sql: &str) -> i64 {
149    match conn.query_row(sql, [], |r| r.get::<_, i64>(0)) {
150        Ok(n) => n,
151        Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("no such table") => 0,
152        Err(e) => {
153            tracing::warn!(target: "embedding", error = %e, sql, "coverage count failed");
154            0
155        }
156    }
157}
158
159#[derive(Serialize)]
160struct EmbeddingListEntry {
161    pending_id: i64,
162    memory_id: i64,
163    name: String,
164    namespace: String,
165    backend_chain: String,
166    last_error: Option<String>,
167    last_exit_code: Option<i32>,
168    last_stderr_tail: Option<String>,
169    attempt_count: i32,
170    status: String,
171    updated_at: i64,
172}
173
174impl From<&PendingEmbedding> for EmbeddingListEntry {
175    fn from(p: &PendingEmbedding) -> Self {
176        Self {
177            pending_id: p.pending_id,
178            memory_id: p.memory_id,
179            name: p.name.clone(),
180            namespace: p.namespace.clone(),
181            backend_chain: p.backend_chain.clone(),
182            last_error: p.last_error.clone(),
183            last_exit_code: p.last_exit_code,
184            last_stderr_tail: p.last_stderr_tail.clone(),
185            attempt_count: p.attempt_count,
186            status: p.status.as_str().to_string(),
187            updated_at: p.updated_at,
188        }
189    }
190}
191
192#[derive(Serialize)]
193struct EmbeddingListOutput {
194    action: &'static str,
195    filter_status: String,
196    count: usize,
197    entries: Vec<EmbeddingListEntry>,
198    elapsed_ms: u64,
199}
200
201#[derive(Serialize)]
202struct EmbeddingAbandonOutput {
203    action: &'static str,
204    pending_id: i64,
205    status: &'static str,
206    elapsed_ms: u64,
207    yes: bool,
208}
209
210pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
211    match args.cmd {
212        EmbeddingCmd::Status(a) => run_status(a, llm_backend),
213        EmbeddingCmd::List(a) => run_list(a),
214        EmbeddingCmd::Abandon(a) => run_abandon(a),
215    }
216}
217
218fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
219    let paths = AppPaths::resolve(db)?;
220    let conn = open_rw(&paths.db)?;
221    Ok((paths, conn))
222}
223
224fn run_status(args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
225    let start = std::time::Instant::now();
226    let (_paths, conn) = open_conn(args.db.as_deref())?;
227
228    let counts = EmbeddingStatusCounts {
229        pending: pending_embeddings::list_by_status(
230            &conn,
231            PendingEmbeddingStatus::Pending,
232            100_000,
233        )?
234        .len(),
235        in_progress: pending_embeddings::list_by_status(
236            &conn,
237            PendingEmbeddingStatus::InProgress,
238            100_000,
239        )?
240        .len(),
241        done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
242            .len(),
243        abandoned: pending_embeddings::list_by_status(
244            &conn,
245            PendingEmbeddingStatus::Abandoned,
246            100_000,
247        )?
248        .len(),
249    };
250
251    let backend_invoked: &'static str = match llm_backend {
252        LlmBackendChoice::Claude => "claude",
253        LlmBackendChoice::Codex => "codex",
254        LlmBackendChoice::Opencode => "opencode",
255        LlmBackendChoice::None => "none",
256        LlmBackendChoice::OpenRouter => "openrouter",
257        LlmBackendChoice::Auto => "auto",
258    };
259
260    // GAP-SG-41: query the actual vector tables so coverage is observable even
261    // when the async queue is empty (the synchronous OpenRouter REST path never
262    // populates `pending_embeddings`).
263    let coverage = EmbeddingCoverage {
264        memories_total: count_table(
265            &conn,
266            "SELECT COUNT(*) FROM memories WHERE deleted_at IS NULL",
267        ),
268        memories_with_vec: count_table(&conn, "SELECT COUNT(*) FROM memory_embeddings"),
269        entities_total: count_table(&conn, "SELECT COUNT(*) FROM entities"),
270        entities_with_vec: count_table(&conn, "SELECT COUNT(*) FROM entity_embeddings"),
271        chunks_total: count_table(&conn, "SELECT COUNT(*) FROM memory_chunks"),
272        chunks_with_vec: count_table(&conn, "SELECT COUNT(*) FROM chunk_embeddings"),
273    };
274
275    let output = EmbeddingStatusOutput {
276        action: "embedding_status",
277        backend_invoked,
278        counts,
279        coverage,
280        elapsed_ms: start.elapsed().as_millis() as u64,
281    };
282    emit_json_compact(&output)
283}
284
285fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
286    let start = std::time::Instant::now();
287    let (_paths, conn) = open_conn(args.db.as_deref())?;
288    let status: PendingEmbeddingStatus = args.status.into();
289    let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
290    let count = rows.len();
291    let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
292    let output = EmbeddingListOutput {
293        action: "embedding_list",
294        filter_status: status.as_str().to_string(),
295        count,
296        entries,
297        elapsed_ms: start.elapsed().as_millis() as u64,
298    };
299    emit_json_compact(&output)
300}
301
302fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
303    let start = std::time::Instant::now();
304    let (_paths, conn) = open_conn(args.db.as_deref())?;
305    pending_embeddings::abandon(&conn, args.pending_id)?;
306    let output = EmbeddingAbandonOutput {
307        action: "embedding_abandon",
308        pending_id: args.pending_id,
309        status: PendingEmbeddingStatus::Abandoned.as_str(),
310        elapsed_ms: start.elapsed().as_millis() as u64,
311        yes: args.yes,
312    };
313    emit_json_compact(&output)
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    // GAP-SG-41: the status output exposes real vector coverage, not only the
321    // async queue counts.
322    #[test]
323    fn embedding_status_output_includes_coverage() {
324        let output = EmbeddingStatusOutput {
325            action: "embedding_status",
326            backend_invoked: "openrouter",
327            counts: EmbeddingStatusCounts::default(),
328            coverage: EmbeddingCoverage {
329                memories_total: 10,
330                memories_with_vec: 9,
331                entities_total: 4,
332                entities_with_vec: 4,
333                chunks_total: 7,
334                chunks_with_vec: 7,
335            },
336            elapsed_ms: 1,
337        };
338        let json = serde_json::to_value(&output).expect("serialize");
339        assert_eq!(json["coverage"]["memories_total"], 10);
340        assert_eq!(json["coverage"]["memories_with_vec"], 9);
341        assert_eq!(json["coverage"]["entities_with_vec"], 4);
342        assert_eq!(json["coverage"]["chunks_with_vec"], 7);
343    }
344
345    #[test]
346    fn status_filter_round_trip() {
347        for f in [
348            EmbeddingStatusFilter::Pending,
349            EmbeddingStatusFilter::InProgress,
350            EmbeddingStatusFilter::Done,
351            EmbeddingStatusFilter::Abandoned,
352        ] {
353            let s: PendingEmbeddingStatus = f.into();
354            assert_eq!(
355                s.as_str(),
356                match f {
357                    EmbeddingStatusFilter::Pending => "pending",
358                    EmbeddingStatusFilter::InProgress => "in_progress",
359                    EmbeddingStatusFilter::Done => "done",
360                    EmbeddingStatusFilter::Abandoned => "abandoned",
361                }
362            );
363        }
364    }
365}