Skip to main content

sqlite_graphrag/commands/
pending_embeddings.rs

1//! GAP-005 (v1.0.82): `pending-embeddings` subcommand — high-level batch
2//! operations over the `pending_embeddings` queue.
3//!
4//! ## Subcommands
5//! - `pending-embeddings list` — alias of `embedding list`
6//! - `pending-embeddings retry-all` — bulk re-queue for retry
7//! - `pending-embeddings abandon` — bulk mark abandoned
8//!
9//! The split between `embedding` and `pending-embeddings` mirrors the GAP-005
10//! plan: `embedding` carries per-entry inspection (`status` / `abandon <id>`)
11//! while `pending-embeddings` carries batch operations over the queue as a
12//! whole. The two share the same `pending_embeddings` table and storage
13//! layer.
14
15use clap::{Args, Subcommand};
16use serde::Serialize;
17
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n  \
26    # List every pending embedding (alias of `embedding list`)\n  \
27    sqlite-graphrag pending-embeddings list --json\n\n  \
28    # Bulk mark every entry in `pending` status as abandoned\n  \
29    sqlite-graphrag pending-embeddings abandon --status pending --yes\n\n  \
30    # Mark every abandoned entry as abandoned (no-op safe retry)\n  \
31    sqlite-graphrag pending-embeddings abandon --status abandoned --yes")]
32pub struct PendingEmbeddingsArgs {
33    #[command(subcommand)]
34    pub cmd: PendingEmbeddingsCmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum PendingEmbeddingsCmd {
39    /// List all pending embeddings (alias of `embedding list`).
40    List(PendingEmbeddingsListArgs),
41    /// Mark every entry in a given status as abandoned.
42    Abandon(PendingEmbeddingsAbandonArgs),
43}
44
45#[derive(Debug, Args)]
46pub struct PendingEmbeddingsListArgs {
47    /// Filter by status: pending | in_progress | done | abandoned. Default: pending.
48    #[arg(long, default_value = "pending")]
49    pub status: String,
50    /// Maximum number of entries to return. Default: 1000.
51    #[arg(long, default_value_t = 1000)]
52    pub limit: usize,
53    #[arg(long, hide = true)]
54    pub json: bool,
55    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
56    pub db: Option<String>,
57}
58
59#[derive(Debug, Args)]
60pub struct PendingEmbeddingsAbandonArgs {
61    /// Status to filter: pending | in_progress | done | abandoned. Default: pending.
62    #[arg(long, default_value = "pending")]
63    pub status: String,
64    /// Skip the interactive confirmation prompt.
65    #[arg(long)]
66    pub yes: bool,
67    /// Dry-run: count candidates without modifying.
68    #[arg(long)]
69    pub dry_run: bool,
70    #[arg(long, hide = true)]
71    pub json: bool,
72    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
73    pub db: Option<String>,
74}
75
76#[derive(Serialize)]
77struct PendingEmbeddingsListEntry {
78    pending_id: i64,
79    memory_id: i64,
80    name: String,
81    namespace: String,
82    backend_chain: String,
83    last_error: Option<String>,
84    last_exit_code: Option<i32>,
85    last_stderr_tail: Option<String>,
86    attempt_count: i32,
87    status: String,
88    updated_at: i64,
89}
90
91impl From<&PendingEmbedding> for PendingEmbeddingsListEntry {
92    fn from(p: &PendingEmbedding) -> Self {
93        Self {
94            pending_id: p.pending_id,
95            memory_id: p.memory_id,
96            name: p.name.clone(),
97            namespace: p.namespace.clone(),
98            backend_chain: p.backend_chain.clone(),
99            last_error: p.last_error.clone(),
100            last_exit_code: p.last_exit_code,
101            last_stderr_tail: p.last_stderr_tail.clone(),
102            attempt_count: p.attempt_count,
103            status: p.status.as_str().to_string(),
104            updated_at: p.updated_at,
105        }
106    }
107}
108
109#[derive(Serialize)]
110struct PendingEmbeddingsListOutput {
111    action: &'static str,
112    filter_status: String,
113    count: usize,
114    entries: Vec<PendingEmbeddingsListEntry>,
115    elapsed_ms: u64,
116}
117
118#[derive(Serialize)]
119struct PendingEmbeddingsAbandonOutput {
120    action: &'static str,
121    dry_run: bool,
122    status: String,
123    candidates: usize,
124    abandoned: usize,
125    elapsed_ms: u64,
126    yes: bool,
127}
128
129pub fn run(args: PendingEmbeddingsArgs) -> Result<(), AppError> {
130    match args.cmd {
131        PendingEmbeddingsCmd::List(a) => run_list(a),
132        PendingEmbeddingsCmd::Abandon(a) => run_abandon(a),
133    }
134}
135
136fn parse_status(s: &str) -> Result<PendingEmbeddingStatus, AppError> {
137    match s {
138        "pending" => Ok(PendingEmbeddingStatus::Pending),
139        "in_progress" => Ok(PendingEmbeddingStatus::InProgress),
140        "done" => Ok(PendingEmbeddingStatus::Done),
141        "abandoned" => Ok(PendingEmbeddingStatus::Abandoned),
142        other => Err(AppError::Validation(format!(
143            "invalid status filter: {other} (expected pending|in_progress|done|abandoned)"
144        ))),
145    }
146}
147
148fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
149    let paths = AppPaths::resolve(db)?;
150    let conn = open_rw(&paths.db)?;
151    Ok((paths, conn))
152}
153
154fn run_list(args: PendingEmbeddingsListArgs) -> Result<(), AppError> {
155    let start = std::time::Instant::now();
156    let (_paths, conn) = open_conn(args.db.as_deref())?;
157    let status = parse_status(&args.status)?;
158    let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
159    let count = rows.len();
160    let entries: Vec<PendingEmbeddingsListEntry> =
161        rows.iter().map(PendingEmbeddingsListEntry::from).collect();
162    let output = PendingEmbeddingsListOutput {
163        action: "pending_embeddings_list",
164        filter_status: status.as_str().to_string(),
165        count,
166        entries,
167        elapsed_ms: start.elapsed().as_millis() as u64,
168    };
169    emit_json_compact(&output)
170}
171
172fn run_abandon(args: PendingEmbeddingsAbandonArgs) -> Result<(), AppError> {
173    let start = std::time::Instant::now();
174    let (_paths, conn) = open_conn(args.db.as_deref())?;
175    let status = parse_status(&args.status)?;
176    let rows = pending_embeddings::list_by_status(&conn, status, 100_000)?;
177    let candidates = rows.len();
178    let mut abandoned = 0usize;
179    if !args.dry_run {
180        for row in &rows {
181            pending_embeddings::abandon(&conn, row.pending_id)?;
182            abandoned += 1;
183        }
184    }
185    let output = PendingEmbeddingsAbandonOutput {
186        action: if args.dry_run {
187            "pending_embeddings_abandon_dry_run"
188        } else {
189            "pending_embeddings_abandon"
190        },
191        dry_run: args.dry_run,
192        status: status.as_str().to_string(),
193        candidates,
194        abandoned,
195        elapsed_ms: start.elapsed().as_millis() as u64,
196        yes: args.yes,
197    };
198    emit_json_compact(&output)
199}