1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # Show queue health and counts per status\n \
27 sqlite-graphrag embedding status --json\n\n \
28 # List all pending embeddings waiting for retry\n \
29 sqlite-graphrag embedding list --status pending --json\n\n \
30 # Mark pending_id 7 as abandoned (will not be retried automatically)\n \
31 sqlite-graphrag embedding abandon 7 --yes\n\n \
32 # Note: `embedding retry` requires re-running an LLM subprocess; for full\n \
33 # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35 #[command(subcommand)]
36 pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41 Status(EmbeddingStatusArgs),
43 List(EmbeddingListArgs),
45 Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {
51 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
52 pub db: Option<String>,
53 #[arg(long, hide = true)]
55 pub json: bool,
56}
57
58#[derive(Debug, Args)]
59pub struct EmbeddingListArgs {
60 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61 pub db: Option<String>,
62 #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
64 pub status: EmbeddingStatusFilter,
65 #[arg(long, default_value_t = 100)]
67 pub limit: usize,
68 #[arg(long, hide = true)]
70 pub json: bool,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
74#[value(rename_all = "snake_case")]
75pub enum EmbeddingStatusFilter {
76 Pending,
77 InProgress,
78 Done,
79 Abandoned,
80}
81
82impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
83 fn from(value: EmbeddingStatusFilter) -> Self {
84 match value {
85 EmbeddingStatusFilter::Pending => Self::Pending,
86 EmbeddingStatusFilter::InProgress => Self::InProgress,
87 EmbeddingStatusFilter::Done => Self::Done,
88 EmbeddingStatusFilter::Abandoned => Self::Abandoned,
89 }
90 }
91}
92
93#[derive(Debug, Args)]
94pub struct EmbeddingAbandonArgs {
95 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96 pub db: Option<String>,
97 pub pending_id: i64,
99 #[arg(long)]
101 pub yes: bool,
102 #[arg(long, hide = true)]
104 pub json: bool,
105}
106
107#[derive(Serialize)]
108struct EmbeddingStatusOutput {
109 action: &'static str,
110 backend_invoked: &'static str,
115 counts: EmbeddingStatusCounts,
116 coverage: EmbeddingCoverage,
121 elapsed_ms: u64,
122}
123
124#[derive(Serialize, Default)]
125struct EmbeddingStatusCounts {
126 pending: usize,
127 in_progress: usize,
128 done: usize,
129 abandoned: usize,
130}
131
132#[derive(Serialize, Default)]
137struct EmbeddingCoverage {
138 memories_total: i64,
139 memories_with_vec: i64,
140 memories_missing: i64,
144 entities_total: i64,
145 entities_with_vec: i64,
146 entities_missing: i64,
148 chunks_total: i64,
149 chunks_with_vec: i64,
150 chunks_missing: i64,
152}
153
154fn count_table(conn: &rusqlite::Connection, sql: &str) -> i64 {
157 match conn.query_row(sql, [], |r| r.get::<_, i64>(0)) {
158 Ok(n) => n,
159 Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("no such table") => 0,
160 Err(e) => {
161 tracing::warn!(target: "embedding", error = %e, sql, "coverage count failed");
162 0
163 }
164 }
165}
166
167fn count_missing(conn: &rusqlite::Connection, sql: &str, total_when_absent: i64) -> i64 {
172 match conn.query_row(sql, [], |r| r.get::<_, i64>(0)) {
173 Ok(n) => n,
174 Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("no such table") => {
175 total_when_absent
176 }
177 Err(e) => {
178 tracing::warn!(target: "embedding", error = %e, sql, "coverage missing-count failed");
179 0
180 }
181 }
182}
183
184#[derive(Serialize)]
185struct EmbeddingListEntry {
186 pending_id: i64,
187 memory_id: i64,
188 name: String,
189 namespace: String,
190 backend_chain: String,
191 last_error: Option<String>,
192 last_exit_code: Option<i32>,
193 last_stderr_tail: Option<String>,
194 attempt_count: i32,
195 status: String,
196 updated_at: i64,
197}
198
199impl From<&PendingEmbedding> for EmbeddingListEntry {
200 fn from(p: &PendingEmbedding) -> Self {
201 Self {
202 pending_id: p.pending_id,
203 memory_id: p.memory_id,
204 name: p.name.clone(),
205 namespace: p.namespace.clone(),
206 backend_chain: p.backend_chain.clone(),
207 last_error: p.last_error.clone(),
208 last_exit_code: p.last_exit_code,
209 last_stderr_tail: p.last_stderr_tail.clone(),
210 attempt_count: p.attempt_count,
211 status: p.status.as_str().to_string(),
212 updated_at: p.updated_at,
213 }
214 }
215}
216
217#[derive(Serialize)]
218struct EmbeddingListOutput {
219 action: &'static str,
220 filter_status: String,
221 count: usize,
222 entries: Vec<EmbeddingListEntry>,
223 elapsed_ms: u64,
224}
225
226#[derive(Serialize)]
227struct EmbeddingAbandonOutput {
228 action: &'static str,
229 pending_id: i64,
230 status: &'static str,
231 elapsed_ms: u64,
232 yes: bool,
233}
234
235pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
236 match args.cmd {
237 EmbeddingCmd::Status(a) => run_status(a, llm_backend),
238 EmbeddingCmd::List(a) => run_list(a),
239 EmbeddingCmd::Abandon(a) => run_abandon(a),
240 }
241}
242
243fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
244 let paths = AppPaths::resolve(db)?;
245 let conn = open_rw(&paths.db)?;
246 Ok((paths, conn))
247}
248
249fn run_status(args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
250 let start = std::time::Instant::now();
251 let (_paths, conn) = open_conn(args.db.as_deref())?;
252
253 let counts = EmbeddingStatusCounts {
254 pending: pending_embeddings::list_by_status(
255 &conn,
256 PendingEmbeddingStatus::Pending,
257 100_000,
258 )?
259 .len(),
260 in_progress: pending_embeddings::list_by_status(
261 &conn,
262 PendingEmbeddingStatus::InProgress,
263 100_000,
264 )?
265 .len(),
266 done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
267 .len(),
268 abandoned: pending_embeddings::list_by_status(
269 &conn,
270 PendingEmbeddingStatus::Abandoned,
271 100_000,
272 )?
273 .len(),
274 };
275
276 let backend_invoked: &'static str = match llm_backend {
277 LlmBackendChoice::Claude => "claude",
278 LlmBackendChoice::Codex => "codex",
279 LlmBackendChoice::Opencode => "opencode",
280 LlmBackendChoice::None => "none",
281 LlmBackendChoice::OpenRouter => "openrouter",
282 LlmBackendChoice::Auto => "auto",
283 };
284
285 let memories_total = count_table(
289 &conn,
290 "SELECT COUNT(*) FROM memories WHERE deleted_at IS NULL",
291 );
292 let entities_total = count_table(&conn, "SELECT COUNT(*) FROM entities");
293 let chunks_total = count_table(&conn, "SELECT COUNT(*) FROM memory_chunks");
294 let coverage = EmbeddingCoverage {
295 memories_total,
296 memories_with_vec: count_table(&conn, "SELECT COUNT(*) FROM memory_embeddings"),
297 memories_missing: count_missing(
300 &conn,
301 "SELECT COUNT(*) FROM memories m \
302 LEFT JOIN memory_embeddings me ON me.memory_id = m.id \
303 WHERE me.memory_id IS NULL AND m.deleted_at IS NULL",
304 memories_total,
305 ),
306 entities_total,
307 entities_with_vec: count_table(&conn, "SELECT COUNT(*) FROM entity_embeddings"),
308 entities_missing: count_missing(
309 &conn,
310 "SELECT COUNT(*) FROM entities e \
311 LEFT JOIN entity_embeddings ee ON ee.entity_id = e.id \
312 WHERE ee.entity_id IS NULL",
313 entities_total,
314 ),
315 chunks_total,
316 chunks_with_vec: count_table(&conn, "SELECT COUNT(*) FROM chunk_embeddings"),
317 chunks_missing: count_missing(
318 &conn,
319 "SELECT COUNT(*) FROM memory_chunks c \
320 LEFT JOIN chunk_embeddings ce ON ce.chunk_id = c.id \
321 WHERE ce.chunk_id IS NULL",
322 chunks_total,
323 ),
324 };
325
326 let output = EmbeddingStatusOutput {
327 action: "embedding_status",
328 backend_invoked,
329 counts,
330 coverage,
331 elapsed_ms: start.elapsed().as_millis() as u64,
332 };
333 emit_json_compact(&output)
334}
335
336fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
337 let start = std::time::Instant::now();
338 let (_paths, conn) = open_conn(args.db.as_deref())?;
339 let status: PendingEmbeddingStatus = args.status.into();
340 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
341 let count = rows.len();
342 let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
343 let output = EmbeddingListOutput {
344 action: "embedding_list",
345 filter_status: status.as_str().to_string(),
346 count,
347 entries,
348 elapsed_ms: start.elapsed().as_millis() as u64,
349 };
350 emit_json_compact(&output)
351}
352
353fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
354 let start = std::time::Instant::now();
355 let (_paths, conn) = open_conn(args.db.as_deref())?;
356 pending_embeddings::abandon(&conn, args.pending_id)?;
357 let output = EmbeddingAbandonOutput {
358 action: "embedding_abandon",
359 pending_id: args.pending_id,
360 status: PendingEmbeddingStatus::Abandoned.as_str(),
361 elapsed_ms: start.elapsed().as_millis() as u64,
362 yes: args.yes,
363 };
364 emit_json_compact(&output)
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
374 fn embedding_status_output_includes_coverage() {
375 let output = EmbeddingStatusOutput {
376 action: "embedding_status",
377 backend_invoked: "openrouter",
378 counts: EmbeddingStatusCounts::default(),
379 coverage: EmbeddingCoverage {
380 memories_total: 10,
381 memories_with_vec: 9,
382 memories_missing: 1,
383 entities_total: 4,
384 entities_with_vec: 4,
385 entities_missing: 0,
386 chunks_total: 7,
387 chunks_with_vec: 7,
388 chunks_missing: 0,
389 },
390 elapsed_ms: 1,
391 };
392 let json = serde_json::to_value(&output).expect("serialize");
393 assert_eq!(json["coverage"]["memories_total"], 10);
394 assert_eq!(json["coverage"]["memories_with_vec"], 9);
395 assert_eq!(json["coverage"]["entities_with_vec"], 4);
396 assert_eq!(json["coverage"]["chunks_with_vec"], 7);
397 assert_eq!(json["coverage"]["memories_missing"], 1);
399 assert_eq!(json["coverage"]["entities_missing"], 0);
400 assert_eq!(json["coverage"]["chunks_missing"], 0);
401 }
402
403 #[test]
406 fn count_missing_counts_gaps_and_falls_back_when_table_absent() {
407 let conn = rusqlite::Connection::open_in_memory().unwrap();
408 conn.execute_batch(
409 "CREATE TABLE entities (id INTEGER PRIMARY KEY, name TEXT);
410 CREATE TABLE entity_embeddings (
411 entity_id INTEGER PRIMARY KEY,
412 embedding BLOB NOT NULL
413 );",
414 )
415 .unwrap();
416 conn.execute(
417 "INSERT INTO entities (id, name) VALUES (1, 'a'), (2, 'b'), (3, 'c')",
418 [],
419 )
420 .unwrap();
421 conn.execute(
422 "INSERT INTO entity_embeddings (entity_id, embedding) VALUES (1, X'00')",
423 [],
424 )
425 .unwrap();
426
427 let missing = count_missing(
428 &conn,
429 "SELECT COUNT(*) FROM entities e \
430 LEFT JOIN entity_embeddings ee ON ee.entity_id = e.id \
431 WHERE ee.entity_id IS NULL",
432 3,
433 );
434 assert_eq!(missing, 2, "2 of 3 entities lack a vector row");
435
436 let missing_absent = count_missing(
438 &conn,
439 "SELECT COUNT(*) FROM entities e \
440 LEFT JOIN chunk_embeddings ce ON ce.chunk_id = e.id \
441 WHERE ce.chunk_id IS NULL",
442 3,
443 );
444 assert_eq!(missing_absent, 3, "absent table must report all missing");
445 }
446
447 #[test]
448 fn status_filter_round_trip() {
449 for f in [
450 EmbeddingStatusFilter::Pending,
451 EmbeddingStatusFilter::InProgress,
452 EmbeddingStatusFilter::Done,
453 EmbeddingStatusFilter::Abandoned,
454 ] {
455 let s: PendingEmbeddingStatus = f.into();
456 assert_eq!(
457 s.as_str(),
458 match f {
459 EmbeddingStatusFilter::Pending => "pending",
460 EmbeddingStatusFilter::InProgress => "in_progress",
461 EmbeddingStatusFilter::Done => "done",
462 EmbeddingStatusFilter::Abandoned => "abandoned",
463 }
464 );
465 }
466 }
467}