sqlite_graphrag/commands/
embedding.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n \
25 # Show queue health and counts per status\n \
26 sqlite-graphrag embedding status --json\n\n \
27 # List all pending embeddings waiting for retry\n \
28 sqlite-graphrag embedding list --status pending --json\n\n \
29 # Mark pending_id 7 as abandoned (will not be retried automatically)\n \
30 sqlite-graphrag embedding abandon 7 --yes\n\n \
31 # Note: `embedding retry` requires re-running an LLM subprocess; for full\n \
32 # retry of every pending entry use `enrich --operation re-embed --pending-only`")]
33pub struct EmbeddingArgs {
34 #[command(subcommand)]
35 pub cmd: EmbeddingCmd,
36}
37
38#[derive(Debug, Subcommand)]
39pub enum EmbeddingCmd {
40 Status(EmbeddingStatusArgs),
42 List(EmbeddingListArgs),
44 Abandon(EmbeddingAbandonArgs),
46}
47
48#[derive(Debug, Args)]
49pub struct EmbeddingStatusArgs {}
50
51#[derive(Debug, Args)]
52pub struct EmbeddingListArgs {
53 #[arg(long, value_enum, default_value_t = EmbeddingStatusFilter::Pending)]
55 pub status: EmbeddingStatusFilter,
56 #[arg(long, default_value_t = 100)]
58 pub limit: usize,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
62#[value(rename_all = "snake_case")]
63pub enum EmbeddingStatusFilter {
64 Pending,
65 InProgress,
66 Done,
67 Abandoned,
68}
69
70impl From<EmbeddingStatusFilter> for PendingEmbeddingStatus {
71 fn from(value: EmbeddingStatusFilter) -> Self {
72 match value {
73 EmbeddingStatusFilter::Pending => Self::Pending,
74 EmbeddingStatusFilter::InProgress => Self::InProgress,
75 EmbeddingStatusFilter::Done => Self::Done,
76 EmbeddingStatusFilter::Abandoned => Self::Abandoned,
77 }
78 }
79}
80
81#[derive(Debug, Args)]
82pub struct EmbeddingAbandonArgs {
83 pub pending_id: i64,
85 #[arg(long)]
87 pub yes: bool,
88}
89
90#[derive(Serialize)]
91struct EmbeddingStatusOutput {
92 action: &'static str,
93 counts: EmbeddingStatusCounts,
94 elapsed_ms: u64,
95}
96
97#[derive(Serialize, Default)]
98struct EmbeddingStatusCounts {
99 pending: usize,
100 in_progress: usize,
101 done: usize,
102 abandoned: usize,
103}
104
105#[derive(Serialize)]
106struct EmbeddingListEntry {
107 pending_id: i64,
108 memory_id: i64,
109 name: String,
110 namespace: String,
111 backend_chain: String,
112 last_error: Option<String>,
113 last_exit_code: Option<i32>,
114 last_stderr_tail: Option<String>,
115 attempt_count: i32,
116 status: String,
117 updated_at: i64,
118}
119
120impl From<&PendingEmbedding> for EmbeddingListEntry {
121 fn from(p: &PendingEmbedding) -> Self {
122 Self {
123 pending_id: p.pending_id,
124 memory_id: p.memory_id,
125 name: p.name.clone(),
126 namespace: p.namespace.clone(),
127 backend_chain: p.backend_chain.clone(),
128 last_error: p.last_error.clone(),
129 last_exit_code: p.last_exit_code,
130 last_stderr_tail: p.last_stderr_tail.clone(),
131 attempt_count: p.attempt_count,
132 status: p.status.as_str().to_string(),
133 updated_at: p.updated_at,
134 }
135 }
136}
137
138#[derive(Serialize)]
139struct EmbeddingListOutput {
140 action: &'static str,
141 filter_status: String,
142 count: usize,
143 entries: Vec<EmbeddingListEntry>,
144 elapsed_ms: u64,
145}
146
147#[derive(Serialize)]
148struct EmbeddingAbandonOutput {
149 action: &'static str,
150 pending_id: i64,
151 status: &'static str,
152 elapsed_ms: u64,
153 yes: bool,
154}
155
156pub fn run(args: EmbeddingArgs) -> Result<(), AppError> {
157 match args.cmd {
158 EmbeddingCmd::Status(a) => run_status(a),
159 EmbeddingCmd::List(a) => run_list(a),
160 EmbeddingCmd::Abandon(a) => run_abandon(a),
161 }
162}
163
164fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
165 let paths = AppPaths::resolve(None)?;
166 let conn = open_rw(&paths.db)?;
167 Ok((paths, conn))
168}
169
170fn run_status(_args: EmbeddingStatusArgs) -> Result<(), AppError> {
171 let start = std::time::Instant::now();
172 let (_paths, conn) = open_conn()?;
173
174 let counts = EmbeddingStatusCounts {
175 pending: pending_embeddings::list_by_status(
176 &conn,
177 PendingEmbeddingStatus::Pending,
178 100_000,
179 )?
180 .len(),
181 in_progress: pending_embeddings::list_by_status(
182 &conn,
183 PendingEmbeddingStatus::InProgress,
184 100_000,
185 )?
186 .len(),
187 done: pending_embeddings::list_by_status(&conn, PendingEmbeddingStatus::Done, 100_000)?
188 .len(),
189 abandoned: pending_embeddings::list_by_status(
190 &conn,
191 PendingEmbeddingStatus::Abandoned,
192 100_000,
193 )?
194 .len(),
195 };
196
197 let output = EmbeddingStatusOutput {
198 action: "embedding_status",
199 counts,
200 elapsed_ms: start.elapsed().as_millis() as u64,
201 };
202 emit_json_compact(&output)
203}
204
205fn run_list(args: EmbeddingListArgs) -> Result<(), AppError> {
206 let start = std::time::Instant::now();
207 let (_paths, conn) = open_conn()?;
208 let status: PendingEmbeddingStatus = args.status.into();
209 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
210 let count = rows.len();
211 let entries: Vec<EmbeddingListEntry> = rows.iter().map(EmbeddingListEntry::from).collect();
212 let output = EmbeddingListOutput {
213 action: "embedding_list",
214 filter_status: status.as_str().to_string(),
215 count,
216 entries,
217 elapsed_ms: start.elapsed().as_millis() as u64,
218 };
219 emit_json_compact(&output)
220}
221
222fn run_abandon(args: EmbeddingAbandonArgs) -> Result<(), AppError> {
223 let start = std::time::Instant::now();
224 let (_paths, conn) = open_conn()?;
225 pending_embeddings::abandon(&conn, args.pending_id)?;
226 let output = EmbeddingAbandonOutput {
227 action: "embedding_abandon",
228 pending_id: args.pending_id,
229 status: PendingEmbeddingStatus::Abandoned.as_str(),
230 elapsed_ms: start.elapsed().as_millis() as u64,
231 yes: args.yes,
232 };
233 emit_json_compact(&output)
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 #[test]
241 fn status_filter_round_trip() {
242 for f in [
243 EmbeddingStatusFilter::Pending,
244 EmbeddingStatusFilter::InProgress,
245 EmbeddingStatusFilter::Done,
246 EmbeddingStatusFilter::Abandoned,
247 ] {
248 let s: PendingEmbeddingStatus = f.into();
249 assert_eq!(
250 s.as_str(),
251 match f {
252 EmbeddingStatusFilter::Pending => "pending",
253 EmbeddingStatusFilter::InProgress => "in_progress",
254 EmbeddingStatusFilter::Done => "done",
255 EmbeddingStatusFilter::Abandoned => "abandoned",
256 }
257 );
258 }
259 }
260}