sqlite_graphrag/commands/
pending_embeddings.rs1use clap::{Args, Subcommand};
16use serde::Serialize;
17
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # List every pending embedding (alias of `embedding list`)\n \
27 sqlite-graphrag pending-embeddings list --json\n\n \
28 # Bulk mark every entry in `pending` status as abandoned\n \
29 sqlite-graphrag pending-embeddings abandon --status pending --yes\n\n \
30 # Mark every abandoned entry as abandoned (no-op safe retry)\n \
31 sqlite-graphrag pending-embeddings abandon --status abandoned --yes")]
32pub struct PendingEmbeddingsArgs {
33 #[command(subcommand)]
34 pub cmd: PendingEmbeddingsCmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum PendingEmbeddingsCmd {
39 List(PendingEmbeddingsListArgs),
41 Abandon(PendingEmbeddingsAbandonArgs),
43}
44
45#[derive(Debug, Args)]
46pub struct PendingEmbeddingsListArgs {
47 #[arg(long, default_value = "pending")]
49 pub status: String,
50 #[arg(long, default_value_t = 1000)]
52 pub limit: usize,
53 #[arg(long, hide = true)]
54 pub json: bool,
55 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
56 pub db: Option<String>,
57}
58
59#[derive(Debug, Args)]
60pub struct PendingEmbeddingsAbandonArgs {
61 #[arg(long, default_value = "pending")]
63 pub status: String,
64 #[arg(long)]
66 pub yes: bool,
67 #[arg(long)]
69 pub dry_run: bool,
70 #[arg(long, hide = true)]
71 pub json: bool,
72 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
73 pub db: Option<String>,
74}
75
76#[derive(Serialize)]
77struct PendingEmbeddingsListEntry {
78 pending_id: i64,
79 memory_id: i64,
80 name: String,
81 namespace: String,
82 backend_chain: String,
83 last_error: Option<String>,
84 last_exit_code: Option<i32>,
85 last_stderr_tail: Option<String>,
86 attempt_count: i32,
87 status: String,
88 updated_at: i64,
89}
90
91impl From<&PendingEmbedding> for PendingEmbeddingsListEntry {
92 fn from(p: &PendingEmbedding) -> Self {
93 Self {
94 pending_id: p.pending_id,
95 memory_id: p.memory_id,
96 name: p.name.clone(),
97 namespace: p.namespace.clone(),
98 backend_chain: p.backend_chain.clone(),
99 last_error: p.last_error.clone(),
100 last_exit_code: p.last_exit_code,
101 last_stderr_tail: p.last_stderr_tail.clone(),
102 attempt_count: p.attempt_count,
103 status: p.status.as_str().to_string(),
104 updated_at: p.updated_at,
105 }
106 }
107}
108
109#[derive(Serialize)]
110struct PendingEmbeddingsListOutput {
111 action: &'static str,
112 filter_status: String,
113 count: usize,
114 entries: Vec<PendingEmbeddingsListEntry>,
115 elapsed_ms: u64,
116}
117
118#[derive(Serialize)]
119struct PendingEmbeddingsAbandonOutput {
120 action: &'static str,
121 dry_run: bool,
122 status: String,
123 candidates: usize,
124 abandoned: usize,
125 elapsed_ms: u64,
126 yes: bool,
127}
128
129pub fn run(args: PendingEmbeddingsArgs) -> Result<(), AppError> {
130 match args.cmd {
131 PendingEmbeddingsCmd::List(a) => run_list(a),
132 PendingEmbeddingsCmd::Abandon(a) => run_abandon(a),
133 }
134}
135
136fn parse_status(s: &str) -> Result<PendingEmbeddingStatus, AppError> {
137 match s {
138 "pending" => Ok(PendingEmbeddingStatus::Pending),
139 "in_progress" => Ok(PendingEmbeddingStatus::InProgress),
140 "done" => Ok(PendingEmbeddingStatus::Done),
141 "abandoned" => Ok(PendingEmbeddingStatus::Abandoned),
142 other => Err(AppError::Validation(format!(
143 "invalid status filter: {other} (expected pending|in_progress|done|abandoned)"
144 ))),
145 }
146}
147
148fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
149 let paths = AppPaths::resolve(db)?;
150 let conn = open_rw(&paths.db)?;
151 Ok((paths, conn))
152}
153
154fn run_list(args: PendingEmbeddingsListArgs) -> Result<(), AppError> {
155 let start = std::time::Instant::now();
156 let (_paths, conn) = open_conn(args.db.as_deref())?;
157 let status = parse_status(&args.status)?;
158 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
159 let count = rows.len();
160 let entries: Vec<PendingEmbeddingsListEntry> =
161 rows.iter().map(PendingEmbeddingsListEntry::from).collect();
162 let output = PendingEmbeddingsListOutput {
163 action: "pending_embeddings_list",
164 filter_status: status.as_str().to_string(),
165 count,
166 entries,
167 elapsed_ms: start.elapsed().as_millis() as u64,
168 };
169 emit_json_compact(&output)
170}
171
172fn run_abandon(args: PendingEmbeddingsAbandonArgs) -> Result<(), AppError> {
173 let start = std::time::Instant::now();
174 let (_paths, conn) = open_conn(args.db.as_deref())?;
175 let status = parse_status(&args.status)?;
176 let rows = pending_embeddings::list_by_status(&conn, status, 100_000)?;
177 let candidates = rows.len();
178 let mut abandoned = 0usize;
179 if !args.dry_run {
180 for row in &rows {
181 pending_embeddings::abandon(&conn, row.pending_id)?;
182 abandoned += 1;
183 }
184 }
185 let output = PendingEmbeddingsAbandonOutput {
186 action: if args.dry_run {
187 "pending_embeddings_abandon_dry_run"
188 } else {
189 "pending_embeddings_abandon"
190 },
191 dry_run: args.dry_run,
192 status: status.as_str().to_string(),
193 candidates,
194 abandoned,
195 elapsed_ms: start.elapsed().as_millis() as u64,
196 yes: args.yes,
197 };
198 emit_json_compact(&output)
199}