1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # Show queue health and counts per status\n \
27 sqlite-graphrag embedding status --json\n\n \
28 # List all pending embeddings waiting for retry\n \
29 sqlite-graphrag embedding list --status pending --json\n\n \
30 # Mark pending_id 7 as abandoned (will not be retried automatically)\n \
31 sqlite-graphrag embedding abandon 7 --yes\n\n \
32 # Note: `embedding retry` requires re-running an LLM subprocess; for full\n \
33 # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35 #[command(subcommand)]
36 pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41 Status(EmbeddingStatusArgs),
43 List(EmbeddingListArgs),
45 Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {
51 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
52 pub db: Option<String>,
53 #[arg(long, hide = true)]
55 pub json: bool,
56}
57
58#[derive(Debug, Args)]
59pub struct EmbeddingListArgs {
60 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61 pub db: Option<String>,
62 #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
64 pub status: EmbeddingStatusFilter,
65 #[arg(long, default_value_t = 100)]
67 pub limit: usize,
68 #[arg(long, hide = true)]
70 pub json: bool,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
74#[value(rename_all = "snake_case")]
75pub enum EmbeddingStatusFilter {
76 Pending,
77 InProgress,
78 Done,
79 Abandoned,
80}
81
82impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
83 fn from(value: EmbeddingStatusFilter) -> Self {
84 match value {
85 EmbeddingStatusFilter::Pending => Self::Pending,
86 EmbeddingStatusFilter::InProgress => Self::InProgress,
87 EmbeddingStatusFilter::Done => Self::Done,
88 EmbeddingStatusFilter::Abandoned => Self::Abandoned,
89 }
90 }
91}
92
93#[derive(Debug, Args)]
94pub struct EmbeddingAbandonArgs {
95 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96 pub db: Option<String>,
97 pub pending_id: i64,
99 #[arg(long)]
101 pub yes: bool,
102 #[arg(long, hide = true)]
104 pub json: bool,
105}
106
107#[derive(Serialize)]
108struct EmbeddingStatusOutput {
109 action: &'static str,
110 backend_invoked: &'static str,
115 counts: EmbeddingStatusCounts,
116 coverage: EmbeddingCoverage,
121 elapsed_ms: u64,
122}
123
124#[derive(Serialize, Default)]
125struct EmbeddingStatusCounts {
126 pending: usize,
127 in_progress: usize,
128 done: usize,
129 abandoned: usize,
130}
131
132#[derive(Serialize, Default)]
137struct EmbeddingCoverage {
138 memories_total: i64,
139 memories_with_vec: i64,
140 entities_total: i64,
141 entities_with_vec: i64,
142 chunks_total: i64,
143 chunks_with_vec: i64,
144}
145
146fn count_table(conn: &rusqlite::Connection, sql: &str) -> i64 {
149 match conn.query_row(sql, [], |r| r.get::<_, i64>(0)) {
150 Ok(n) => n,
151 Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("no such table") => 0,
152 Err(e) => {
153 tracing::warn!(target: "embedding", error = %e, sql, "coverage count failed");
154 0
155 }
156 }
157}
158
159#[derive(Serialize)]
160struct EmbeddingListEntry {
161 pending_id: i64,
162 memory_id: i64,
163 name: String,
164 namespace: String,
165 backend_chain: String,
166 last_error: Option<String>,
167 last_exit_code: Option<i32>,
168 last_stderr_tail: Option<String>,
169 attempt_count: i32,
170 status: String,
171 updated_at: i64,
172}
173
174impl From<&PendingEmbedding> for EmbeddingListEntry {
175 fn from(p: &PendingEmbedding) -> Self {
176 Self {
177 pending_id: p.pending_id,
178 memory_id: p.memory_id,
179 name: p.name.clone(),
180 namespace: p.namespace.clone(),
181 backend_chain: p.backend_chain.clone(),
182 last_error: p.last_error.clone(),
183 last_exit_code: p.last_exit_code,
184 last_stderr_tail: p.last_stderr_tail.clone(),
185 attempt_count: p.attempt_count,
186 status: p.status.as_str().to_string(),
187 updated_at: p.updated_at,
188 }
189 }
190}
191
192#[derive(Serialize)]
193struct EmbeddingListOutput {
194 action: &'static str,
195 filter_status: String,
196 count: usize,
197 entries: Vec<EmbeddingListEntry>,
198 elapsed_ms: u64,
199}
200
201#[derive(Serialize)]
202struct EmbeddingAbandonOutput {
203 action: &'static str,
204 pending_id: i64,
205 status: &'static str,
206 elapsed_ms: u64,
207 yes: bool,
208}
209
210pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
211 match args.cmd {
212 EmbeddingCmd::Status(a) => run_status(a, llm_backend),
213 EmbeddingCmd::List(a) => run_list(a),
214 EmbeddingCmd::Abandon(a) => run_abandon(a),
215 }
216}
217
218fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
219 let paths = AppPaths::resolve(db)?;
220 let conn = open_rw(&paths.db)?;
221 Ok((paths, conn))
222}
223
224fn run_status(args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
225 let start = std::time::Instant::now();
226 let (_paths, conn) = open_conn(args.db.as_deref())?;
227
228 let counts = EmbeddingStatusCounts {
229 pending: pending_embeddings::list_by_status(
230 &conn,
231 PendingEmbeddingStatus::Pending,
232 100_000,
233 )?
234 .len(),
235 in_progress: pending_embeddings::list_by_status(
236 &conn,
237 PendingEmbeddingStatus::InProgress,
238 100_000,
239 )?
240 .len(),
241 done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
242 .len(),
243 abandoned: pending_embeddings::list_by_status(
244 &conn,
245 PendingEmbeddingStatus::Abandoned,
246 100_000,
247 )?
248 .len(),
249 };
250
251 let backend_invoked: &'static str = match llm_backend {
252 LlmBackendChoice::Claude => "claude",
253 LlmBackendChoice::Codex => "codex",
254 LlmBackendChoice::Opencode => "opencode",
255 LlmBackendChoice::None => "none",
256 LlmBackendChoice::OpenRouter => "openrouter",
257 LlmBackendChoice::Auto => "auto",
258 };
259
260 let coverage = EmbeddingCoverage {
264 memories_total: count_table(
265 &conn,
266 "SELECT COUNT(*) FROM memories WHERE deleted_at IS NULL",
267 ),
268 memories_with_vec: count_table(&conn, "SELECT COUNT(*) FROM memory_embeddings"),
269 entities_total: count_table(&conn, "SELECT COUNT(*) FROM entities"),
270 entities_with_vec: count_table(&conn, "SELECT COUNT(*) FROM entity_embeddings"),
271 chunks_total: count_table(&conn, "SELECT COUNT(*) FROM memory_chunks"),
272 chunks_with_vec: count_table(&conn, "SELECT COUNT(*) FROM chunk_embeddings"),
273 };
274
275 let output = EmbeddingStatusOutput {
276 action: "embedding_status",
277 backend_invoked,
278 counts,
279 coverage,
280 elapsed_ms: start.elapsed().as_millis() as u64,
281 };
282 emit_json_compact(&output)
283}
284
285fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
286 let start = std::time::Instant::now();
287 let (_paths, conn) = open_conn(args.db.as_deref())?;
288 let status: PendingEmbeddingStatus = args.status.into();
289 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
290 let count = rows.len();
291 let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
292 let output = EmbeddingListOutput {
293 action: "embedding_list",
294 filter_status: status.as_str().to_string(),
295 count,
296 entries,
297 elapsed_ms: start.elapsed().as_millis() as u64,
298 };
299 emit_json_compact(&output)
300}
301
302fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
303 let start = std::time::Instant::now();
304 let (_paths, conn) = open_conn(args.db.as_deref())?;
305 pending_embeddings::abandon(&conn, args.pending_id)?;
306 let output = EmbeddingAbandonOutput {
307 action: "embedding_abandon",
308 pending_id: args.pending_id,
309 status: PendingEmbeddingStatus::Abandoned.as_str(),
310 elapsed_ms: start.elapsed().as_millis() as u64,
311 yes: args.yes,
312 };
313 emit_json_compact(&output)
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 #[test]
323 fn embedding_status_output_includes_coverage() {
324 let output = EmbeddingStatusOutput {
325 action: "embedding_status",
326 backend_invoked: "openrouter",
327 counts: EmbeddingStatusCounts::default(),
328 coverage: EmbeddingCoverage {
329 memories_total: 10,
330 memories_with_vec: 9,
331 entities_total: 4,
332 entities_with_vec: 4,
333 chunks_total: 7,
334 chunks_with_vec: 7,
335 },
336 elapsed_ms: 1,
337 };
338 let json = serde_json::to_value(&output).expect("serialize");
339 assert_eq!(json["coverage"]["memories_total"], 10);
340 assert_eq!(json["coverage"]["memories_with_vec"], 9);
341 assert_eq!(json["coverage"]["entities_with_vec"], 4);
342 assert_eq!(json["coverage"]["chunks_with_vec"], 7);
343 }
344
345 #[test]
346 fn status_filter_round_trip() {
347 for f in [
348 EmbeddingStatusFilter::Pending,
349 EmbeddingStatusFilter::InProgress,
350 EmbeddingStatusFilter::Done,
351 EmbeddingStatusFilter::Abandoned,
352 ] {
353 let s: PendingEmbeddingStatus = f.into();
354 assert_eq!(
355 s.as_str(),
356 match f {
357 EmbeddingStatusFilter::Pending => "pending",
358 EmbeddingStatusFilter::InProgress => "in_progress",
359 EmbeddingStatusFilter::Done => "done",
360 EmbeddingStatusFilter::Abandoned => "abandoned",
361 }
362 );
363 }
364 }
365}