sqlite_graphrag/commands/
embedding.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::cli::LlmBackendChoice;
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # Show queue health and counts per status\n \
27 sqlite-graphrag embedding status --json\n\n \
28 # List all pending embeddings waiting for retry\n \
29 sqlite-graphrag embedding list --status pending --json\n\n \
30 # Mark pending_id 7 as abandoned (will not be retried automatically)\n \
31 sqlite-graphrag embedding abandon 7 --yes\n\n \
32 # Note: `embedding retry` requires re-running an LLM subprocess; for full\n \
33 # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
34pub struct EmbeddingArgs {
35 #[command(subcommand)]
36 pub cmd: EmbeddingCmd,
37}
38
39#[derive(Debug, Subcommand)]
40pub enum EmbeddingCmd {
41 Status(EmbeddingStatusArgs),
43 List(EmbeddingListArgs),
45 Abandon(EmbeddingAbandonArgs),
47}
48
49#[derive(Debug, Args)]
50pub struct EmbeddingStatusArgs {}
51
52#[derive(Debug, Args)]
53pub struct EmbeddingListArgs {
54 #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
56 pub status: EmbeddingStatusFilter,
57 #[arg(long, default_value_t = 100)]
59 pub limit: usize,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
63#[value(rename_all = "snake_case")]
64pub enum EmbeddingStatusFilter {
65 Pending,
66 InProgress,
67 Done,
68 Abandoned,
69}
70
71impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
72 fn from(value: EmbeddingStatusFilter) -> Self {
73 match value {
74 EmbeddingStatusFilter::Pending => Self::Pending,
75 EmbeddingStatusFilter::InProgress => Self::InProgress,
76 EmbeddingStatusFilter::Done => Self::Done,
77 EmbeddingStatusFilter::Abandoned => Self::Abandoned,
78 }
79 }
80}
81
82#[derive(Debug, Args)]
83pub struct EmbeddingAbandonArgs {
84 pub pending_id: i64,
86 #[arg(long)]
88 pub yes: bool,
89}
90
91#[derive(Serialize)]
92struct EmbeddingStatusOutput {
93 action: &'static str,
94 backend_invoked: &'static str,
99 counts: EmbeddingStatusCounts,
100 elapsed_ms: u64,
101}
102
103#[derive(Serialize, Default)]
104struct EmbeddingStatusCounts {
105 pending: usize,
106 in_progress: usize,
107 done: usize,
108 abandoned: usize,
109}
110
111#[derive(Serialize)]
112struct EmbeddingListEntry {
113 pending_id: i64,
114 memory_id: i64,
115 name: String,
116 namespace: String,
117 backend_chain: String,
118 last_error: Option<String>,
119 last_exit_code: Option<i32>,
120 last_stderr_tail: Option<String>,
121 attempt_count: i32,
122 status: String,
123 updated_at: i64,
124}
125
126impl From<&PendingEmbedding> for EmbeddingListEntry {
127 fn from(p: &PendingEmbedding) -> Self {
128 Self {
129 pending_id: p.pending_id,
130 memory_id: p.memory_id,
131 name: p.name.clone(),
132 namespace: p.namespace.clone(),
133 backend_chain: p.backend_chain.clone(),
134 last_error: p.last_error.clone(),
135 last_exit_code: p.last_exit_code,
136 last_stderr_tail: p.last_stderr_tail.clone(),
137 attempt_count: p.attempt_count,
138 status: p.status.as_str().to_string(),
139 updated_at: p.updated_at,
140 }
141 }
142}
143
144#[derive(Serialize)]
145struct EmbeddingListOutput {
146 action: &'static str,
147 filter_status: String,
148 count: usize,
149 entries: Vec<EmbeddingListEntry>,
150 elapsed_ms: u64,
151}
152
153#[derive(Serialize)]
154struct EmbeddingAbandonOutput {
155 action: &'static str,
156 pending_id: i64,
157 status: &'static str,
158 elapsed_ms: u64,
159 yes: bool,
160}
161
162pub fn run(args: EmbeddingArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
163 match args.cmd {
164 EmbeddingCmd::Status(a) => run_status(a, llm_backend),
165 EmbeddingCmd::List(a) => run_list(a),
166 EmbeddingCmd::Abandon(a) => run_abandon(a),
167 }
168}
169
170fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
171 let paths = AppPaths::resolve(None)?;
172 let conn = open_rw(&paths.db)?;
173 Ok((paths, conn))
174}
175
176fn run_status(_args: EmbeddingStatusArgs, llm_backend: LlmBackendChoice) -> Result<(), AppError> {
177 let start = std::time::Instant::now();
178 let (_paths, conn) = open_conn()?;
179
180 let counts = EmbeddingStatusCounts {
181 pending: pending_embeddings::list_by_status(
182 &conn,
183 PendingEmbeddingStatus::Pending,
184 100_000,
185 )?
186 .len(),
187 in_progress: pending_embeddings::list_by_status(
188 &conn,
189 PendingEmbeddingStatus::InProgress,
190 100_000,
191 )?
192 .len(),
193 done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
194 .len(),
195 abandoned: pending_embeddings::list_by_status(
196 &conn,
197 PendingEmbeddingStatus::Abandoned,
198 100_000,
199 )?
200 .len(),
201 };
202
203 let backend_invoked: &'static str = match llm_backend {
204 LlmBackendChoice::Claude => "claude",
205 LlmBackendChoice::Codex => "codex",
206 LlmBackendChoice::None => "none",
207 LlmBackendChoice::Auto => "auto",
208 };
209
210 let output = EmbeddingStatusOutput {
211 action: "embedding_status",
212 backend_invoked,
213 counts,
214 elapsed_ms: start.elapsed().as_millis() as u64,
215 };
216 emit_json_compact(&output)
217}
218
219fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
220 let start = std::time::Instant::now();
221 let (_paths, conn) = open_conn()?;
222 let status: PendingEmbeddingStatus = args.status.into();
223 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
224 let count = rows.len();
225 let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
226 let output = EmbeddingListOutput {
227 action: "embedding_list",
228 filter_status: status.as_str().to_string(),
229 count,
230 entries,
231 elapsed_ms: start.elapsed().as_millis() as u64,
232 };
233 emit_json_compact(&output)
234}
235
236fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
237 let start = std::time::Instant::now();
238 let (_paths, conn) = open_conn()?;
239 pending_embeddings::abandon(&conn, args.pending_id)?;
240 let output = EmbeddingAbandonOutput {
241 action: "embedding_abandon",
242 pending_id: args.pending_id,
243 status: PendingEmbeddingStatus::Abandoned.as_str(),
244 elapsed_ms: start.elapsed().as_millis() as u64,
245 yes: args.yes,
246 };
247 emit_json_compact(&output)
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[test]
255 fn status_filter_round_trip() {
256 for f in [
257 EmbeddingStatusFilter::Pending,
258 EmbeddingStatusFilter::InProgress,
259 EmbeddingStatusFilter::Done,
260 EmbeddingStatusFilter::Abandoned,
261 ] {
262 let s: PendingEmbeddingStatus = f.into();
263 assert_eq!(
264 s.as_str(),
265 match f {
266 EmbeddingStatusFilter::Pending => "pending",
267 EmbeddingStatusFilter::InProgress => "in_progress",
268 EmbeddingStatusFilter::Done => "done",
269 EmbeddingStatusFilter::Abandoned => "abandoned",
270 }
271 );
272 }
273 }
274}