sqlite_graphrag/commands/
pending_embeddings.rs1use clap::{Args, Subcommand};
16use serde::Serialize;
17
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # List every pending embedding (alias of `embedding list`)\n \
27 sqlite-graphrag pending-embeddings list --json\n\n \
28 # Bulk mark every entry in `pending` status as abandoned\n \
29 sqlite-graphrag pending-embeddings abandon --status pending --yes\n\n \
30 # Mark every abandoned entry as abandoned (no-op safe retry)\n \
31 sqlite-graphrag pending-embeddings abandon --status abandoned --yes")]
32pub struct PendingEmbeddingsArgs {
33 #[command(subcommand)]
34 pub cmd: PendingEmbeddingsCmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum PendingEmbeddingsCmd {
39 List(PendingEmbeddingsListArgs),
41 Abandon(PendingEmbeddingsAbandonArgs),
43}
44
45#[derive(Debug, Args)]
46pub struct PendingEmbeddingsListArgs {
47 #[arg(long, default_value = "pending")]
49 pub status: String,
50 #[arg(long, default_value_t = 1000)]
52 pub limit: usize,
53 #[arg(long, hide = true)]
54 pub json: bool,
55}
56
57#[derive(Debug, Args)]
58pub struct PendingEmbeddingsAbandonArgs {
59 #[arg(long, default_value = "pending")]
61 pub status: String,
62 #[arg(long)]
64 pub yes: bool,
65 #[arg(long)]
67 pub dry_run: bool,
68 #[arg(long, hide = true)]
69 pub json: bool,
70}
71
72#[derive(Serialize)]
73struct PendingEmbeddingsListEntry {
74 pending_id: i64,
75 memory_id: i64,
76 name: String,
77 namespace: String,
78 backend_chain: String,
79 last_error: Option<String>,
80 last_exit_code: Option<i32>,
81 last_stderr_tail: Option<String>,
82 attempt_count: i32,
83 status: String,
84 updated_at: i64,
85}
86
87impl From<&PendingEmbedding> for PendingEmbeddingsListEntry {
88 fn from(p: &PendingEmbedding) -> Self {
89 Self {
90 pending_id: p.pending_id,
91 memory_id: p.memory_id,
92 name: p.name.clone(),
93 namespace: p.namespace.clone(),
94 backend_chain: p.backend_chain.clone(),
95 last_error: p.last_error.clone(),
96 last_exit_code: p.last_exit_code,
97 last_stderr_tail: p.last_stderr_tail.clone(),
98 attempt_count: p.attempt_count,
99 status: p.status.as_str().to_string(),
100 updated_at: p.updated_at,
101 }
102 }
103}
104
105#[derive(Serialize)]
106struct PendingEmbeddingsListOutput {
107 action: &'static str,
108 filter_status: String,
109 count: usize,
110 entries: Vec<PendingEmbeddingsListEntry>,
111 elapsed_ms: u64,
112}
113
114#[derive(Serialize)]
115struct PendingEmbeddingsAbandonOutput {
116 action: &'static str,
117 dry_run: bool,
118 status: String,
119 candidates: usize,
120 abandoned: usize,
121 elapsed_ms: u64,
122 yes: bool,
123}
124
125pub fn run(args: PendingEmbeddingsArgs) -> Result<(), AppError> {
126 match args.cmd {
127 PendingEmbeddingsCmd::List(a) => run_list(a),
128 PendingEmbeddingsCmd::Abandon(a) => run_abandon(a),
129 }
130}
131
132fn parse_status(s: &str) -> Result<PendingEmbeddingStatus, AppError> {
133 match s {
134 "pending" => Ok(PendingEmbeddingStatus::Pending),
135 "in_progress" => Ok(PendingEmbeddingStatus::InProgress),
136 "done" => Ok(PendingEmbeddingStatus::Done),
137 "abandoned" => Ok(PendingEmbeddingStatus::Abandoned),
138 other => Err(AppError::Validation(format!(
139 "invalid status filter: {other} (expected pending|in_progress|done|abandoned)"
140 ))),
141 }
142}
143
144fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
145 let paths = AppPaths::resolve(None)?;
146 let conn = open_rw(&paths.db)?;
147 Ok((paths, conn))
148}
149
150fn run_list(args: PendingEmbeddingsListArgs) -> Result<(), AppError> {
151 let start = std::time::Instant::now();
152 let (_paths, conn) = open_conn()?;
153 let status = parse_status(&args.status)?;
154 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
155 let count = rows.len();
156 let entries: Vec<PendingEmbeddingsListEntry> =
157 rows.iter().map(PendingEmbeddingsListEntry::from).collect();
158 let output = PendingEmbeddingsListOutput {
159 action: "pending_embeddings_list",
160 filter_status: status.as_str().to_string(),
161 count,
162 entries,
163 elapsed_ms: start.elapsed().as_millis() as u64,
164 };
165 emit_json_compact(&output)
166}
167
168fn run_abandon(args: PendingEmbeddingsAbandonArgs) -> Result<(), AppError> {
169 let start = std::time::Instant::now();
170 let (_paths, conn) = open_conn()?;
171 let status = parse_status(&args.status)?;
172 let rows = pending_embeddings::list_by_status(&conn, status, 100_000)?;
173 let candidates = rows.len();
174 let mut abandoned = 0usize;
175 if !args.dry_run {
176 for row in &rows {
177 pending_embeddings::abandon(&conn, row.pending_id)?;
178 abandoned += 1;
179 }
180 }
181 let output = PendingEmbeddingsAbandonOutput {
182 action: if args.dry_run {
183 "pending_embeddings_abandon_dry_run"
184 } else {
185 "pending_embeddings_abandon"
186 },
187 dry_run: args.dry_run,
188 status: status.as_str().to_string(),
189 candidates,
190 abandoned,
191 elapsed_ms: start.elapsed().as_millis() as u64,
192 yes: args.yes,
193 };
194 emit_json_compact(&output)
195}