Skip to main content

sqlite_graphrag/commands/
embedding.rs

1//! GAP-005 (v1.0.82): `embedding` subcommand — health and retry of the
2//! pending-embeddings queue that buffers memories whose embedding step failed.
3//!
4//! ## Subcommands
5//! - `embedding status` — counts by status
6//! - `embedding list [--status <STATUS>]` — list pending entries
7//! - `embedding retry <pending_id>` — re-run embedding for one entry
8//! - `embedding abandon <pending_id>` — mark as abandoned
9//!
10//! The pending_embeddings table captures every `embed_with_fallback` failure
11//! with `exit_code`, `stderr_tail`, and `backend_chain` for diagnostics. This
12//! subcommand makes that state observable and recoverable.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n  \
25    # Show queue health and counts per status\n  \
26    sqlite-graphrag embedding status --json\n\n  \
27    # List all pending embeddings waiting for retry\n  \
28    sqlite-graphrag embedding list --status pending --json\n\n  \
29    # Mark pending_id 7 as abandoned (will not be retried automatically)\n  \
30    sqlite-graphrag embedding abandon 7 --yes\n\n  \
31    # Note: `embedding retry` requires re-running an LLM subprocess; for full\n  \
32    # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
33pub struct EmbeddingArgs {
34    #[command(subcommand)]
35    pub cmd: EmbeddingCmd,
36}
37
38#[derive(Debug, Subcommand)]
39pub enum EmbeddingCmd {
40    /// Show queue health (counts by status).
41    Status(EmbeddingStatusArgs),
42    /// List pending embeddings filtered by status.
43    List(EmbeddingListArgs),
44    /// Mark one entry as abandoned.
45    Abandon(EmbeddingAbandonArgs),
46}
47
48#[derive(Debug, Args)]
49pub struct EmbeddingStatusArgs {}
50
51#[derive(Debug, Args)]
52pub struct EmbeddingListArgs {
53    /// Filter by status: pending | in_progress | done | abandoned. Default: pending.
54    #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
55    pub status: EmbeddingStatusFilter,
56    /// Maximum number of entries to return. Default: 100.
57    #[arg(long, default_value_t = 100)]
58    pub limit: usize,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
62#[value(rename_all = "snake_case")]
63pub enum EmbeddingStatusFilter {
64    Pending,
65    InProgress,
66    Done,
67    Abandoned,
68}
69
70impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
71    fn from(value: EmbeddingStatusFilter) -> Self {
72        match value {
73            EmbeddingStatusFilter::Pending => Self::Pending,
74            EmbeddingStatusFilter::InProgress => Self::InProgress,
75            EmbeddingStatusFilter::Done => Self::Done,
76            EmbeddingStatusFilter::Abandoned => Self::Abandoned,
77        }
78    }
79}
80
81#[derive(Debug, Args)]
82pub struct EmbeddingAbandonArgs {
83    /// Pending id to abandon.
84    pub pending_id: i64,
85    /// Skip the interactive confirmation prompt.
86    #[arg(long)]
87    pub yes: bool,
88}
89
90#[derive(Serialize)]
91struct EmbeddingStatusOutput {
92    action: &'static str,
93    counts: EmbeddingStatusCounts,
94    elapsed_ms: u64,
95}
96
97#[derive(Serialize, Default)]
98struct EmbeddingStatusCounts {
99    pending: usize,
100    in_progress: usize,
101    done: usize,
102    abandoned: usize,
103}
104
105#[derive(Serialize)]
106struct EmbeddingListEntry {
107    pending_id: i64,
108    memory_id: i64,
109    name: String,
110    namespace: String,
111    backend_chain: String,
112    last_error: Option<String>,
113    last_exit_code: Option<i32>,
114    last_stderr_tail: Option<String>,
115    attempt_count: i32,
116    status: String,
117    updated_at: i64,
118}
119
120impl From<&PendingEmbedding> for EmbeddingListEntry {
121    fn from(p: &PendingEmbedding) -> Self {
122        Self {
123            pending_id: p.pending_id,
124            memory_id: p.memory_id,
125            name: p.name.clone(),
126            namespace: p.namespace.clone(),
127            backend_chain: p.backend_chain.clone(),
128            last_error: p.last_error.clone(),
129            last_exit_code: p.last_exit_code,
130            last_stderr_tail: p.last_stderr_tail.clone(),
131            attempt_count: p.attempt_count,
132            status: p.status.as_str().to_string(),
133            updated_at: p.updated_at,
134        }
135    }
136}
137
138#[derive(Serialize)]
139struct EmbeddingListOutput {
140    action: &'static str,
141    filter_status: String,
142    count: usize,
143    entries: Vec<EmbeddingListEntry>,
144    elapsed_ms: u64,
145}
146
147#[derive(Serialize)]
148struct EmbeddingAbandonOutput {
149    action: &'static str,
150    pending_id: i64,
151    status: &'static str,
152    elapsed_ms: u64,
153    yes: bool,
154}
155
156pub fn run(args: EmbeddingArgs) -> Result<(), AppError> {
157    match args.cmd {
158        EmbeddingCmd::Status(a) => run_status(a),
159        EmbeddingCmd::List(a) => run_list(a),
160        EmbeddingCmd::Abandon(a) => run_abandon(a),
161    }
162}
163
164fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
165    let paths = AppPaths::resolve(None)?;
166    let conn = open_rw(&paths.db)?;
167    Ok((paths, conn))
168}
169
170fn run_status(_args: EmbeddingStatusArgs) -> Result<(), AppError> {
171    let start = std::time::Instant::now();
172    let (_paths, conn) = open_conn()?;
173
174    let counts = EmbeddingStatusCounts {
175        pending: pending_embeddings::list_by_status(
176            &conn,
177            PendingEmbeddingStatus::Pending,
178            100_000,
179        )?
180        .len(),
181        in_progress: pending_embeddings::list_by_status(
182            &conn,
183            PendingEmbeddingStatus::InProgress,
184            100_000,
185        )?
186        .len(),
187        done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
188            .len(),
189        abandoned: pending_embeddings::list_by_status(
190            &conn,
191            PendingEmbeddingStatus::Abandoned,
192            100_000,
193        )?
194        .len(),
195    };
196
197    let output = EmbeddingStatusOutput {
198        action: "embedding_status",
199        counts,
200        elapsed_ms: start.elapsed().as_millis() as u64,
201    };
202    emit_json_compact(&output)
203}
204
205fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
206    let start = std::time::Instant::now();
207    let (_paths, conn) = open_conn()?;
208    let status: PendingEmbeddingStatus = args.status.into();
209    let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
210    let count = rows.len();
211    let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
212    let output = EmbeddingListOutput {
213        action: "embedding_list",
214        filter_status: status.as_str().to_string(),
215        count,
216        entries,
217        elapsed_ms: start.elapsed().as_millis() as u64,
218    };
219    emit_json_compact(&output)
220}
221
222fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
223    let start = std::time::Instant::now();
224    let (_paths, conn) = open_conn()?;
225    pending_embeddings::abandon(&conn, args.pending_id)?;
226    let output = EmbeddingAbandonOutput {
227        action: "embedding_abandon",
228        pending_id: args.pending_id,
229        status: PendingEmbeddingStatus::Abandoned.as_str(),
230        elapsed_ms: start.elapsed().as_millis() as u64,
231        yes: args.yes,
232    };
233    emit_json_compact(&output)
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    #[test]
241    fn status_filter_round_trip() {
242        for f in [
243            EmbeddingStatusFilter::Pending,
244            EmbeddingStatusFilter::InProgress,
245            EmbeddingStatusFilter::Done,
246            EmbeddingStatusFilter::Abandoned,
247        ] {
248            let s: PendingEmbeddingStatus = f.into();
249            assert_eq!(
250                s.as_str(),
251                match f {
252                    EmbeddingStatusFilter::Pending => "pending",
253                    EmbeddingStatusFilter::InProgress => "in_progress",
254                    EmbeddingStatusFilter::Done => "done",
255                    EmbeddingStatusFilter::Abandoned => "abandoned",
256                }
257            );
258        }
259    }
260}