Skip to main content

sqlite_graphrag/commands/
embedding.rs

1//! GAP-005 (v1.0.82): `embedding` subcommand — health and retry of the
2//! pending-embeddings queue that buffers memories whose embedding step failed.
3//!
4//! ## Subcommands
5//! - `embedding status` — counts by status
6//! - `embedding list [--status <STATUS>]` — list pending entries
7//! - `embedding retry <pending_id>` — re-run embedding for one entry
8//! - `embedding abandon <pending_id>` — mark as abandoned
9//!
10//! The pending_embeddings table captures every `embed_with_fallback` failure
11//! with `exit_code`, `stderr_tail`, and `backend_chain` for diagnostics. This
12//! subcommand makes that state observable and recoverable.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n  \
26    # Show queue health and counts per status\n  \
27    sqlite-graphrag embedding status --json\n\n  \
28    # List all pending embeddings waiting for retry\n  \
29    sqlite-graphrag embedding list --status pending --json\n\n  \
30    # Mark pending_id 7 as abandoned (will not be retried automatically)\n  \
31    sqlite-graphrag embedding abandon 7 --yes\n\n  \
32    # Note: `embedding retry` requires re-running an LLM subprocess; for full\n  \
33    # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35    #[command(subcommand)]
36    pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41    /// Show queue health (counts by status).
42    Status(EmbeddingStatusArgs),
43    /// List pending embeddings filtered by status.
44    List(EmbeddingListArgs),
45    /// Mark one entry as abandoned.
46    Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {
51    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
52    pub db: Option<String>,
53}
54
55#[derive(Debug, Args)]
56pub struct EmbeddingListArgs {
57    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
58    pub db: Option<String>,
59    /// Filter by status: pending | in_progress | done | abandoned. Default: pending.
60    #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
61    pub status: EmbeddingStatusFilter,
62    /// Maximum number of entries to return. Default: 100.
63    #[arg(long, default_value_t = 100)]
64    pub limit: usize,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
68#[value(rename_all = "snake_case")]
69pub enum EmbeddingStatusFilter {
70    Pending,
71    InProgress,
72    Done,
73    Abandoned,
74}
75
76impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
77    fn from(value: EmbeddingStatusFilter) -> Self {
78        match value {
79            EmbeddingStatusFilter::Pending => Self::Pending,
80            EmbeddingStatusFilter::InProgress => Self::InProgress,
81            EmbeddingStatusFilter::Done => Self::Done,
82            EmbeddingStatusFilter::Abandoned => Self::Abandoned,
83        }
84    }
85}
86
87#[derive(Debug, Args)]
88pub struct EmbeddingAbandonArgs {
89    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
90    pub db: Option<String>,
91    /// Pending id to abandon.
92    pub pending_id: i64,
93    /// Skip the interactive confirmation prompt.
94    #[arg(long)]
95    pub yes: bool,
96}
97
98#[derive(Serialize)]
99struct EmbeddingStatusOutput {
100    action: &'static str,
101    /// v1.0.84 (ADR-0042): discriminador do backend LLM que seria
102    /// invocado para processar embeddings live. `"claude" | "codex"
103    /// | "none" | "auto"`. `"auto"` indica que o caller pediu Auto e
104    /// a chain codex→claude→none seria iterada em runtime.
105    backend_invoked: &'static str,
106    counts: EmbeddingStatusCounts,
107    elapsed_ms: u64,
108}
109
110#[derive(Serialize, Default)]
111struct EmbeddingStatusCounts {
112    pending: usize,
113    in_progress: usize,
114    done: usize,
115    abandoned: usize,
116}
117
118#[derive(Serialize)]
119struct EmbeddingListEntry {
120    pending_id: i64,
121    memory_id: i64,
122    name: String,
123    namespace: String,
124    backend_chain: String,
125    last_error: Option<String>,
126    last_exit_code: Option<i32>,
127    last_stderr_tail: Option<String>,
128    attempt_count: i32,
129    status: String,
130    updated_at: i64,
131}
132
133impl From<&PendingEmbedding> for EmbeddingListEntry {
134    fn from(p: &PendingEmbedding) -> Self {
135        Self {
136            pending_id: p.pending_id,
137            memory_id: p.memory_id,
138            name: p.name.clone(),
139            namespace: p.namespace.clone(),
140            backend_chain: p.backend_chain.clone(),
141            last_error: p.last_error.clone(),
142            last_exit_code: p.last_exit_code,
143            last_stderr_tail: p.last_stderr_tail.clone(),
144            attempt_count: p.attempt_count,
145            status: p.status.as_str().to_string(),
146            updated_at: p.updated_at,
147        }
148    }
149}
150
151#[derive(Serialize)]
152struct EmbeddingListOutput {
153    action: &'static str,
154    filter_status: String,
155    count: usize,
156    entries: Vec<EmbeddingListEntry>,
157    elapsed_ms: u64,
158}
159
160#[derive(Serialize)]
161struct EmbeddingAbandonOutput {
162    action: &'static str,
163    pending_id: i64,
164    status: &'static str,
165    elapsed_ms: u64,
166    yes: bool,
167}
168
169pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
170    match args.cmd {
171        EmbeddingCmd::Status(a) => run_status(a, llm_backend),
172        EmbeddingCmd::List(a) => run_list(a),
173        EmbeddingCmd::Abandon(a) => run_abandon(a),
174    }
175}
176
177fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
178    let paths = AppPaths::resolve(db)?;
179    let conn = open_rw(&paths.db)?;
180    Ok((paths, conn))
181}
182
183fn run_status(args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
184    let start = std::time::Instant::now();
185    let (_paths, conn) = open_conn(args.db.as_deref())?;
186
187    let counts = EmbeddingStatusCounts {
188        pending: pending_embeddings::list_by_status(
189            &conn,
190            PendingEmbeddingStatus::Pending,
191            100_000,
192        )?
193        .len(),
194        in_progress: pending_embeddings::list_by_status(
195            &conn,
196            PendingEmbeddingStatus::InProgress,
197            100_000,
198        )?
199        .len(),
200        done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
201            .len(),
202        abandoned: pending_embeddings::list_by_status(
203            &conn,
204            PendingEmbeddingStatus::Abandoned,
205            100_000,
206        )?
207        .len(),
208    };
209
210    let backend_invoked: &'static str = match llm_backend {
211        LlmBackendChoice::Claude => "claude",
212        LlmBackendChoice::Codex => "codex",
213        LlmBackendChoice::None => "none",
214        LlmBackendChoice::Auto => "auto",
215    };
216
217    let output = EmbeddingStatusOutput {
218        action: "embedding_status",
219        backend_invoked,
220        counts,
221        elapsed_ms: start.elapsed().as_millis() as u64,
222    };
223    emit_json_compact(&output)
224}
225
226fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
227    let start = std::time::Instant::now();
228    let (_paths, conn) = open_conn(args.db.as_deref())?;
229    let status: PendingEmbeddingStatus = args.status.into();
230    let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
231    let count = rows.len();
232    let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
233    let output = EmbeddingListOutput {
234        action: "embedding_list",
235        filter_status: status.as_str().to_string(),
236        count,
237        entries,
238        elapsed_ms: start.elapsed().as_millis() as u64,
239    };
240    emit_json_compact(&output)
241}
242
243fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
244    let start = std::time::Instant::now();
245    let (_paths, conn) = open_conn(args.db.as_deref())?;
246    pending_embeddings::abandon(&conn, args.pending_id)?;
247    let output = EmbeddingAbandonOutput {
248        action: "embedding_abandon",
249        pending_id: args.pending_id,
250        status: PendingEmbeddingStatus::Abandoned.as_str(),
251        elapsed_ms: start.elapsed().as_millis() as u64,
252        yes: args.yes,
253    };
254    emit_json_compact(&output)
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    #[test]
262    fn status_filter_round_trip() {
263        for f in [
264            EmbeddingStatusFilter::Pending,
265            EmbeddingStatusFilter::InProgress,
266            EmbeddingStatusFilter::Done,
267            EmbeddingStatusFilter::Abandoned,
268        ] {
269            let s: PendingEmbeddingStatus = f.into();
270            assert_eq!(
271                s.as_str(),
272                match f {
273                    EmbeddingStatusFilter::Pending => "pending",
274                    EmbeddingStatusFilter::InProgress => "in_progress",
275                    EmbeddingStatusFilter::Done => "done",
276                    EmbeddingStatusFilter::Abandoned => "abandoned",
277                }
278            );
279        }
280    }
281}