Skip to main content

sqlite_graphrag/commands/
embedding.rs

1//! GAP-005 (v1.0.82): `embedding` subcommand — health and retry of the
2//! pending-embeddings queue that buffers memories whose embedding step failed.
3//!
4//! ## Subcommands
5//! - `embedding status` — counts by status
6//! - `embedding list [--status <STATUS>]` — list pending entries
7//! - `embedding retry <pending_id>` — re-run embedding for one entry
8//! - `embedding abandon <pending_id>` — mark as abandoned
9//!
10//! The pending_embeddings table captures every `embed_with_fallback` failure
11//! with `exit_code`, `stderr_tail`, and `backend_chain` for diagnostics. This
12//! subcommand makes that state observable and recoverable.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n  \
26    # Show queue health and counts per status\n  \
27    sqlite-graphrag embedding status --json\n\n  \
28    # List all pending embeddings waiting for retry\n  \
29    sqlite-graphrag embedding list --status pending --json\n\n  \
30    # Mark pending_id 7 as abandoned (will not be retried automatically)\n  \
31    sqlite-graphrag embedding abandon 7 --yes\n\n  \
32    # Note: `embedding retry` requires re-running an LLM subprocess; for full\n  \
33    # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35    #[command(subcommand)]
36    pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41    /// Show queue health (counts by status).
42    Status(EmbeddingStatusArgs),
43    /// List pending embeddings filtered by status.
44    List(EmbeddingListArgs),
45    /// Mark one entry as abandoned.
46    Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {}
51
52#[derive(Debug, Args)]
53pub struct EmbeddingListArgs {
54    /// Filter by status: pending | in_progress | done | abandoned. Default: pending.
55    #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
56    pub status: EmbeddingStatusFilter,
57    /// Maximum number of entries to return. Default: 100.
58    #[arg(long, default_value_t = 100)]
59    pub limit: usize,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
63#[value(rename_all = "snake_case")]
64pub enum EmbeddingStatusFilter {
65    Pending,
66    InProgress,
67    Done,
68    Abandoned,
69}
70
71impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
72    fn from(value: EmbeddingStatusFilter) -> Self {
73        match value {
74            EmbeddingStatusFilter::Pending => Self::Pending,
75            EmbeddingStatusFilter::InProgress => Self::InProgress,
76            EmbeddingStatusFilter::Done => Self::Done,
77            EmbeddingStatusFilter::Abandoned => Self::Abandoned,
78        }
79    }
80}
81
82#[derive(Debug, Args)]
83pub struct EmbeddingAbandonArgs {
84    /// Pending id to abandon.
85    pub pending_id: i64,
86    /// Skip the interactive confirmation prompt.
87    #[arg(long)]
88    pub yes: bool,
89}
90
91#[derive(Serialize)]
92struct EmbeddingStatusOutput {
93    action: &'static str,
94    /// v1.0.84 (ADR-0042): discriminador do backend LLM que seria
95    /// invocado para processar embeddings live. `"claude" | "codex"
96    /// | "none" | "auto"`. `"auto"` indica que o caller pediu Auto e
97    /// a chain codex→claude→none seria iterada em runtime.
98    backend_invoked: &'static str,
99    counts: EmbeddingStatusCounts,
100    elapsed_ms: u64,
101}
102
103#[derive(Serialize, Default)]
104struct EmbeddingStatusCounts {
105    pending: usize,
106    in_progress: usize,
107    done: usize,
108    abandoned: usize,
109}
110
111#[derive(Serialize)]
112struct EmbeddingListEntry {
113    pending_id: i64,
114    memory_id: i64,
115    name: String,
116    namespace: String,
117    backend_chain: String,
118    last_error: Option<String>,
119    last_exit_code: Option<i32>,
120    last_stderr_tail: Option<String>,
121    attempt_count: i32,
122    status: String,
123    updated_at: i64,
124}
125
126impl From<&PendingEmbedding> for EmbeddingListEntry {
127    fn from(p: &PendingEmbedding) -> Self {
128        Self {
129            pending_id: p.pending_id,
130            memory_id: p.memory_id,
131            name: p.name.clone(),
132            namespace: p.namespace.clone(),
133            backend_chain: p.backend_chain.clone(),
134            last_error: p.last_error.clone(),
135            last_exit_code: p.last_exit_code,
136            last_stderr_tail: p.last_stderr_tail.clone(),
137            attempt_count: p.attempt_count,
138            status: p.status.as_str().to_string(),
139            updated_at: p.updated_at,
140        }
141    }
142}
143
144#[derive(Serialize)]
145struct EmbeddingListOutput {
146    action: &'static str,
147    filter_status: String,
148    count: usize,
149    entries: Vec<EmbeddingListEntry>,
150    elapsed_ms: u64,
151}
152
153#[derive(Serialize)]
154struct EmbeddingAbandonOutput {
155    action: &'static str,
156    pending_id: i64,
157    status: &'static str,
158    elapsed_ms: u64,
159    yes: bool,
160}
161
162pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
163    match args.cmd {
164        EmbeddingCmd::Status(a) => run_status(a, llm_backend),
165        EmbeddingCmd::List(a) => run_list(a),
166        EmbeddingCmd::Abandon(a) => run_abandon(a),
167    }
168}
169
170fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
171    let paths = AppPaths::resolve(None)?;
172    let conn = open_rw(&paths.db)?;
173    Ok((paths, conn))
174}
175
176fn run_status(_args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
177    let start = std::time::Instant::now();
178    let (_paths, conn) = open_conn()?;
179
180    let counts = EmbeddingStatusCounts {
181        pending: pending_embeddings::list_by_status(
182            &conn,
183            PendingEmbeddingStatus::Pending,
184            100_000,
185        )?
186        .len(),
187        in_progress: pending_embeddings::list_by_status(
188            &conn,
189            PendingEmbeddingStatus::InProgress,
190            100_000,
191        )?
192        .len(),
193        done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
194            .len(),
195        abandoned: pending_embeddings::list_by_status(
196            &conn,
197            PendingEmbeddingStatus::Abandoned,
198            100_000,
199        )?
200        .len(),
201    };
202
203    let backend_invoked: &'static str = match llm_backend {
204        LlmBackendChoice::Claude => "claude",
205        LlmBackendChoice::Codex => "codex",
206        LlmBackendChoice::None => "none",
207        LlmBackendChoice::Auto => "auto",
208    };
209
210    let output = EmbeddingStatusOutput {
211        action: "embedding_status",
212        backend_invoked,
213        counts,
214        elapsed_ms: start.elapsed().as_millis() as u64,
215    };
216    emit_json_compact(&output)
217}
218
219fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
220    let start = std::time::Instant::now();
221    let (_paths, conn) = open_conn()?;
222    let status: PendingEmbeddingStatus = args.status.into();
223    let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
224    let count = rows.len();
225    let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
226    let output = EmbeddingListOutput {
227        action: "embedding_list",
228        filter_status: status.as_str().to_string(),
229        count,
230        entries,
231        elapsed_ms: start.elapsed().as_millis() as u64,
232    };
233    emit_json_compact(&output)
234}
235
236fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
237    let start = std::time::Instant::now();
238    let (_paths, conn) = open_conn()?;
239    pending_embeddings::abandon(&conn, args.pending_id)?;
240    let output = EmbeddingAbandonOutput {
241        action: "embedding_abandon",
242        pending_id: args.pending_id,
243        status: PendingEmbeddingStatus::Abandoned.as_str(),
244        elapsed_ms: start.elapsed().as_millis() as u64,
245        yes: args.yes,
246    };
247    emit_json_compact(&output)
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[test]
255    fn status_filter_round_trip() {
256        for f in [
257            EmbeddingStatusFilter::Pending,
258            EmbeddingStatusFilter::InProgress,
259            EmbeddingStatusFilter::Done,
260            EmbeddingStatusFilter::Abandoned,
261        ] {
262            let s: PendingEmbeddingStatus = f.into();
263            assert_eq!(
264                s.as_str(),
265                match f {
266                    EmbeddingStatusFilter::Pending => "pending",
267                    EmbeddingStatusFilter::InProgress => "in_progress",
268                    EmbeddingStatusFilter::Done => "done",
269                    EmbeddingStatusFilter::Abandoned => "abandoned",
270                }
271            );
272        }
273    }
274}