sqlite_graphrag/commands/
pending_embeddings.rs1use clap::{Args, Subcommand};
16use serde::Serialize;
17
18use crate::errors::AppError;
19use crate::output::emit_json_compact;
20use crate::paths::AppPaths;
21use crate::storage::connection::open_rw;
22use crate::storage::pending_embeddings::{self, PendingEmbedding, PendingEmbeddingStatus};
23
24#[derive(Debug, Args)]
25#[command(after_long_help = "EXAMPLES:\n \
26 # List every pending embedding (alias of `embedding list`)\n \
27 sqlite-graphrag pending-embeddings list --json\n\n \
28 # Bulk mark every entry in `pending` status as abandoned\n \
29 sqlite-graphrag pending-embeddings abandon --status pending --yes\n\n \
30 # Mark every abandoned entry as abandoned (no-op safe retry)\n \
31 sqlite-graphrag pending-embeddings abandon --status abandoned --yes")]
32pub struct PendingEmbeddingsArgs {
33 #[command(subcommand)]
34 pub cmd: PendingEmbeddingsCmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum PendingEmbeddingsCmd {
39 List(PendingEmbeddingsListArgs),
41 Abandon(PendingEmbeddingsAbandonArgs),
43}
44
45#[derive(Debug, Args)]
46pub struct PendingEmbeddingsListArgs {
47 #[arg(long, default_value = "pending")]
49 pub status: String,
50 #[arg(long, default_value_t = 1000)]
52 pub limit: usize,
53}
54
55#[derive(Debug, Args)]
56pub struct PendingEmbeddingsAbandonArgs {
57 #[arg(long, default_value = "pending")]
59 pub status: String,
60 #[arg(long)]
62 pub yes: bool,
63 #[arg(long)]
65 pub dry_run: bool,
66}
67
68#[derive(Serialize)]
69struct PendingEmbeddingsListEntry {
70 pending_id: i64,
71 memory_id: i64,
72 name: String,
73 namespace: String,
74 backend_chain: String,
75 last_error: Option<String>,
76 last_exit_code: Option<i32>,
77 last_stderr_tail: Option<String>,
78 attempt_count: i32,
79 status: String,
80 updated_at: i64,
81}
82
83impl From<&PendingEmbedding> for PendingEmbeddingsListEntry {
84 fn from(p: &PendingEmbedding) -> Self {
85 Self {
86 pending_id: p.pending_id,
87 memory_id: p.memory_id,
88 name: p.name.clone(),
89 namespace: p.namespace.clone(),
90 backend_chain: p.backend_chain.clone(),
91 last_error: p.last_error.clone(),
92 last_exit_code: p.last_exit_code,
93 last_stderr_tail: p.last_stderr_tail.clone(),
94 attempt_count: p.attempt_count,
95 status: p.status.as_str().to_string(),
96 updated_at: p.updated_at,
97 }
98 }
99}
100
101#[derive(Serialize)]
102struct PendingEmbeddingsListOutput {
103 action: &'static str,
104 filter_status: String,
105 count: usize,
106 entries: Vec<PendingEmbeddingsListEntry>,
107 elapsed_ms: u64,
108}
109
110#[derive(Serialize)]
111struct PendingEmbeddingsAbandonOutput {
112 action: &'static str,
113 dry_run: bool,
114 status: String,
115 candidates: usize,
116 abandoned: usize,
117 elapsed_ms: u64,
118 yes: bool,
119}
120
121pub fn run(args: PendingEmbeddingsArgs) -> Result<(), AppError> {
122 match args.cmd {
123 PendingEmbeddingsCmd::List(a) => run_list(a),
124 PendingEmbeddingsCmd::Abandon(a) => run_abandon(a),
125 }
126}
127
128fn parse_status(s: &str) -> Result<PendingEmbeddingStatus, AppError> {
129 match s {
130 "pending" => Ok(PendingEmbeddingStatus::Pending),
131 "in_progress" => Ok(PendingEmbeddingStatus::InProgress),
132 "done" => Ok(PendingEmbeddingStatus::Done),
133 "abandoned" => Ok(PendingEmbeddingStatus::Abandoned),
134 other => Err(AppError::Validation(format!(
135 "invalid status filter: {other} (expected pending|in_progress|done|abandoned)"
136 ))),
137 }
138}
139
140fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
141 let paths = AppPaths::resolve(None)?;
142 let conn = open_rw(&paths.db)?;
143 Ok((paths, conn))
144}
145
146fn run_list(args: PendingEmbeddingsListArgs) -> Result<(), AppError> {
147 let start = std::time::Instant::now();
148 let (_paths, conn) = open_conn()?;
149 let status = parse_status(&args.status)?;
150 let rows = pending_embeddings::list_by_status(&conn, status, args.limit)?;
151 let count = rows.len();
152 let entries: Vec<PendingEmbeddingsListEntry> =
153 rows.iter().map(PendingEmbeddingsListEntry::from).collect();
154 let output = PendingEmbeddingsListOutput {
155 action: "pending_embeddings_list",
156 filter_status: status.as_str().to_string(),
157 count,
158 entries,
159 elapsed_ms: start.elapsed().as_millis() as u64,
160 };
161 emit_json_compact(&output)
162}
163
164fn run_abandon(args: PendingEmbeddingsAbandonArgs) -> Result<(), AppError> {
165 let start = std::time::Instant::now();
166 let (_paths, conn) = open_conn()?;
167 let status = parse_status(&args.status)?;
168 let rows = pending_embeddings::list_by_status(&conn, status, 100_000)?;
169 let candidates = rows.len();
170 let mut abandoned = 0usize;
171 if !args.dry_run {
172 for row in &rows {
173 pending_embeddings::abandon(&conn, row.pending_id)?;
174 abandoned += 1;
175 }
176 }
177 let output = PendingEmbeddingsAbandonOutput {
178 action: if args.dry_run {
179 "pending_embeddings_abandon_dry_run"
180 } else {
181 "pending_embeddings_abandon"
182 },
183 dry_run: args.dry_run,
184 status: status.as_str().to_string(),
185 candidates,
186 abandoned,
187 elapsed_ms: start.elapsed().as_millis() as u64,
188 yes: args.yes,
189 };
190 emit_json_compact(&output)
191}