sqlite_graphrag/commands/
embedding.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # Show queue health and counts per status\n \
27 sqlite-graphrag embedding status --json\n\n \
28 # List all pending embeddings waiting for retry\n \
29 sqlite-graphrag embedding list --status pending --json\n\n \
30 # Mark pending_id 7 as abandoned (will not be retried automatically)\n \
31 sqlite-graphrag embedding abandon 7 --yes\n\n \
32 # Note: `embedding retry` requires re-running an LLM subprocess; for full\n \
33 # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35 #[command(subcommand)]
36 pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41 Status(EmbeddingStatusArgs),
43 List(EmbeddingListArgs),
45 Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {
51 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
52 pub db: Option<String>,
53}
54
55#[derive(Debug, Args)]
56pub struct EmbeddingListArgs {
57 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
58 pub db: Option<String>,
59 #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
61 pub status: EmbeddingStatusFilter,
62 #[arg(long, default_value_t = 100)]
64 pub limit: usize,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
68#[value(rename_all = "snake_case")]
69pub enum EmbeddingStatusFilter {
70 Pending,
71 InProgress,
72 Done,
73 Abandoned,
74}
75
76impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
77 fn from(value: EmbeddingStatusFilter) -> Self {
78 match value {
79 EmbeddingStatusFilter::Pending => Self::Pending,
80 EmbeddingStatusFilter::InProgress => Self::InProgress,
81 EmbeddingStatusFilter::Done => Self::Done,
82 EmbeddingStatusFilter::Abandoned => Self::Abandoned,
83 }
84 }
85}
86
87#[derive(Debug, Args)]
88pub struct EmbeddingAbandonArgs {
89 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
90 pub db: Option<String>,
91 pub pending_id: i64,
93 #[arg(long)]
95 pub yes: bool,
96}
97
98#[derive(Serialize)]
99struct EmbeddingStatusOutput {
100 action: &'static str,
101 backend_invoked: &'static str,
106 counts: EmbeddingStatusCounts,
107 elapsed_ms: u64,
108}
109
110#[derive(Serialize, Default)]
111struct EmbeddingStatusCounts {
112 pending: usize,
113 in_progress: usize,
114 done: usize,
115 abandoned: usize,
116}
117
118#[derive(Serialize)]
119struct EmbeddingListEntry {
120 pending_id: i64,
121 memory_id: i64,
122 name: String,
123 namespace: String,
124 backend_chain: String,
125 last_error: Option<String>,
126 last_exit_code: Option<i32>,
127 last_stderr_tail: Option<String>,
128 attempt_count: i32,
129 status: String,
130 updated_at: i64,
131}
132
133impl From<&PendingEmbedding> for EmbeddingListEntry {
134 fn from(p: &PendingEmbedding) -> Self {
135 Self {
136 pending_id: p.pending_id,
137 memory_id: p.memory_id,
138 name: p.name.clone(),
139 namespace: p.namespace.clone(),
140 backend_chain: p.backend_chain.clone(),
141 last_error: p.last_error.clone(),
142 last_exit_code: p.last_exit_code,
143 last_stderr_tail: p.last_stderr_tail.clone(),
144 attempt_count: p.attempt_count,
145 status: p.status.as_str().to_string(),
146 updated_at: p.updated_at,
147 }
148 }
149}
150
151#[derive(Serialize)]
152struct EmbeddingListOutput {
153 action: &'static str,
154 filter_status: String,
155 count: usize,
156 entries: Vec<EmbeddingListEntry>,
157 elapsed_ms: u64,
158}
159
160#[derive(Serialize)]
161struct EmbeddingAbandonOutput {
162 action: &'static str,
163 pending_id: i64,
164 status: &'static str,
165 elapsed_ms: u64,
166 yes: bool,
167}
168
169pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
170 match args.cmd {
171 EmbeddingCmd::Status(a) => run_status(a, llm_backend),
172 EmbeddingCmd::List(a) => run_list(a),
173 EmbeddingCmd::Abandon(a) => run_abandon(a),
174 }
175}
176
177fn open_conn(db: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
178 let paths = AppPaths::resolve(db)?;
179 let conn = open_rw(&paths.db)?;
180 Ok((paths, conn))
181}
182
183fn run_status(args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
184 let start = std::time::Instant::now();
185 let (_paths, conn) = open_conn(args.db.as_deref())?;
186
187 let counts = EmbeddingStatusCounts {
188 pending: pending_embeddings::list_by_status(
189 &conn,
190 PendingEmbeddingStatus::Pending,
191 100_000,
192 )?
193 .len(),
194 in_progress: pending_embeddings::list_by_status(
195 &conn,
196 PendingEmbeddingStatus::InProgress,
197 100_000,
198 )?
199 .len(),
200 done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
201 .len(),
202 abandoned: pending_embeddings::list_by_status(
203 &conn,
204 PendingEmbeddingStatus::Abandoned,
205 100_000,
206 )?
207 .len(),
208 };
209
210 let backend_invoked: &'static str = match llm_backend {
211 LlmBackendChoice::Claude => "claude",
212 LlmBackendChoice::Codex => "codex",
213 LlmBackendChoice::None => "none",
214 LlmBackendChoice::Auto => "auto",
215 };
216
217 let output = EmbeddingStatusOutput {
218 action: "embedding_status",
219 backend_invoked,
220 counts,
221 elapsed_ms: start.elapsed().as_millis() as u64,
222 };
223 emit_json_compact(&output)
224}
225
226fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
227 let start = std::time::Instant::now();
228 let (_paths, conn) = open_conn(args.db.as_deref())?;
229 let status: PendingEmbeddingStatus = args.status.into();
230 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
231 let count = rows.len();
232 let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
233 let output = EmbeddingListOutput {
234 action: "embedding_list",
235 filter_status: status.as_str().to_string(),
236 count,
237 entries,
238 elapsed_ms: start.elapsed().as_millis() as u64,
239 };
240 emit_json_compact(&output)
241}
242
243fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
244 let start = std::time::Instant::now();
245 let (_paths, conn) = open_conn(args.db.as_deref())?;
246 pending_embeddings::abandon(&conn, args.pending_id)?;
247 let output = EmbeddingAbandonOutput {
248 action: "embedding_abandon",
249 pending_id: args.pending_id,
250 status: PendingEmbeddingStatus::Abandoned.as_str(),
251 elapsed_ms: start.elapsed().as_millis() as u64,
252 yes: args.yes,
253 };
254 emit_json_compact(&output)
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 #[test]
262 fn status_filter_round_trip() {
263 for f in [
264 EmbeddingStatusFilter::Pending,
265 EmbeddingStatusFilter::InProgress,
266 EmbeddingStatusFilter::Done,
267 EmbeddingStatusFilter::Abandoned,
268 ] {
269 let s: PendingEmbeddingStatus = f.into();
270 assert_eq!(
271 s.as_str(),
272 match f {
273 EmbeddingStatusFilter::Pending => "pending",
274 EmbeddingStatusFilter::InProgress => "in_progress",
275 EmbeddingStatusFilter::Done => "done",
276 EmbeddingStatusFilter::Abandoned => "abandoned",
277 }
278 );
279 }
280 }
281}