sqlite_graphrag/commands/
embedding.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # Show queue health and counts per status\n \
27 sqlite-graphrag embedding status --json\n\n \
28 # List all pending embeddings waiting for retry\n \
29 sqlite-graphrag embedding list --status pending --json\n\n \
30 # Mark pending_id 7 as abandoned (will not be retried automatically)\n \
31 sqlite-graphrag embedding abandon 7 --yes\n\n \
32 # Note: `embedding retry` requires re-running an LLM subprocess; for full\n \
33 # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35 #[command(subcommand)]
36 pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41 Status(EmbeddingStatusArgs),
43 List(EmbeddingListArgs),
45 Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {
51 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
52 pub db: Option<String>,
53 #[arg(long, hide = true)]
55 pub json: bool,
56}
57
58#[derive(Debug, Args)]
59pub struct EmbeddingListArgs {
60 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
61 pub db: Option<String>,
62 #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
64 pub status: EmbeddingStatusFilter,
65 #[arg(long, default_value_t = 100)]
67 pub limit: usize,
68 #[arg(long, hide = true)]
70 pub json: bool,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
74#[value(rename_all = "snake_case")]
75pub enum EmbeddingStatusFilter {
76 Pending,
77 InProgress,
78 Done,
79 Abandoned,
80}
81
82impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
83 fn from(value: EmbeddingStatusFilter) -> Self {
84 match value {
85 EmbeddingStatusFilter::Pending => Self::Pending,
86 EmbeddingStatusFilter::InProgress => Self::InProgress,
87 EmbeddingStatusFilter::Done => Self::Done,
88 EmbeddingStatusFilter::Abandoned => Self::Abandoned,
89 }
90 }
91}
92
93#[derive(Debug, Args)]
94pub struct EmbeddingAbandonArgs {
95 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
96 pub db: Option<String>,
97 pub pending_id: i64,
99 #[arg(long)]
101 pub yes: bool,
102 #[arg(long, hide = true)]
104 pub json: bool,
105}
106
107#[derive(Serialize)]
108struct EmbeddingStatusOutput {
109 action: &'static str,
110 backend_invoked: &'static str,
115 counts: EmbeddingStatusCounts,
116 elapsed_ms: u64,
117}
118
119#[derive(Serialize, Default)]
120struct EmbeddingStatusCounts {
121 pending: usize,
122 in_progress: usize,
123 done: usize,
124 abandoned: usize,
125}
126
127#[derive(Serialize)]
128struct EmbeddingListEntry {
129 pending_id: i64,
130 memory_id: i64,
131 name: String,
132 namespace: String,
133 backend_chain: String,
134 last_error: Option<String>,
135 last_exit_code: Option<i32>,
136 last_stderr_tail: Option<String>,
137 attempt_count: i32,
138 status: String,
139 updated_at: i64,
140}
141
142impl From<&PendingEmbedding> for EmbeddingListEntry {
143 fn from(p: &PendingEmbedding) -> Self {
144 Self {
145 pending_id: p.pending_id,
146 memory_id: p.memory_id,
147 name: p.name.clone(),
148 namespace: p.namespace.clone(),
149 backend_chain: p.backend_chain.clone(),
150 last_error: p.last_error.clone(),
151 last_exit_code: p.last_exit_code,
152 last_stderr_tail: p.last_stderr_tail.clone(),
153 attempt_count: p.attempt_count,
154 status: p.status.as_str().to_string(),
155 updated_at: p.updated_at,
156 }
157 }
158}
159
160#[derive(Serialize)]
161struct EmbeddingListOutput {
162 action: &'static str,
163 filter_status: String,
164 count: usize,
165 entries: Vec<EmbeddingListEntry>,
166 elapsed_ms: u64,
167}
168
169#[derive(Serialize)]
170struct EmbeddingAbandonOutput {
171 action: &'static str,
172 pending_id: i64,
173 status: &'static str,
174 elapsed_ms: u64,
175 yes: bool,
176}
177
178pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
179 match args.cmd {
180 EmbeddingCmd::Status(a) => run_status(a, llm_backend),
181 EmbeddingCmd::List(a) => run_list(a),
182 EmbeddingCmd::Abandon(a) => run_abandon(a),
183 }
184}
185
186fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
187 let paths = AppPaths::resolve(db)?;
188 let conn = open_rw(&paths.db)?;
189 Ok((paths, conn))
190}
191
192fn run_status(args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
193 let start = std::time::Instant::now();
194 let (_paths, conn) = open_conn(args.db.as_deref())?;
195
196 let counts = EmbeddingStatusCounts {
197 pending: pending_embeddings::list_by_status(
198 &conn,
199 PendingEmbeddingStatus::Pending,
200 100_000,
201 )?
202 .len(),
203 in_progress: pending_embeddings::list_by_status(
204 &conn,
205 PendingEmbeddingStatus::InProgress,
206 100_000,
207 )?
208 .len(),
209 done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
210 .len(),
211 abandoned: pending_embeddings::list_by_status(
212 &conn,
213 PendingEmbeddingStatus::Abandoned,
214 100_000,
215 )?
216 .len(),
217 };
218
219 let backend_invoked: &'static str = match llm_backend {
220 LlmBackendChoice::Claude => "claude",
221 LlmBackendChoice::Codex => "codex",
222 LlmBackendChoice::Opencode => "opencode",
223 LlmBackendChoice::None => "none",
224 LlmBackendChoice::OpenRouter => "openrouter",
225 LlmBackendChoice::Auto => "auto",
226 };
227
228 let output = EmbeddingStatusOutput {
229 action: "embedding_status",
230 backend_invoked,
231 counts,
232 elapsed_ms: start.elapsed().as_millis() as u64,
233 };
234 emit_json_compact(&output)
235}
236
237fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
238 let start = std::time::Instant::now();
239 let (_paths, conn) = open_conn(args.db.as_deref())?;
240 let status: PendingEmbeddingStatus = args.status.into();
241 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
242 let count = rows.len();
243 let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
244 let output = EmbeddingListOutput {
245 action: "embedding_list",
246 filter_status: status.as_str().to_string(),
247 count,
248 entries,
249 elapsed_ms: start.elapsed().as_millis() as u64,
250 };
251 emit_json_compact(&output)
252}
253
254fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
255 let start = std::time::Instant::now();
256 let (_paths, conn) = open_conn(args.db.as_deref())?;
257 pending_embeddings::abandon(&conn, args.pending_id)?;
258 let output = EmbeddingAbandonOutput {
259 action: "embedding_abandon",
260 pending_id: args.pending_id,
261 status: PendingEmbeddingStatus::Abandoned.as_str(),
262 elapsed_ms: start.elapsed().as_millis() as u64,
263 yes: args.yes,
264 };
265 emit_json_compact(&output)
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn status_filter_round_trip() {
274 for f in [
275 EmbeddingStatusFilter::Pending,
276 EmbeddingStatusFilter::InProgress,
277 EmbeddingStatusFilter::Done,
278 EmbeddingStatusFilter::Abandoned,
279 ] {
280 let s: PendingEmbeddingStatus = f.into();
281 assert_eq!(
282 s.as_str(),
283 match f {
284 EmbeddingStatusFilter::Pending => "pending",
285 EmbeddingStatusFilter::InProgress => "in_progress",
286 EmbeddingStatusFilter::Done => "done",
287 EmbeddingStatusFilter::Abandoned => "abandoned",
288 }
289 );
290 }
291 }
292}