sqlite_graphrag/commands/
pending.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n \
25 # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n \
26 sqlite-graphrag pending list --status validated --json\n\n \
27 # Show the full record of pending_id 42\n \
28 sqlite-graphrag pending show 42 --json\n\n \
29 # Clean up entries abandoned for >24h (86400 seconds)\n \
30 sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32 #[command(subcommand)]
33 pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38 List(PendingListArgs),
40 Show(PendingShowArgs),
42 Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48 #[arg(long, value_enum)]
51 pub status: Option<PendingStatusArg>,
52 #[arg(long, default_value_t = 100)]
54 pub limit: usize,
55 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
59 pub db: Option<String>,
60 #[arg(long, hide = true)]
62 pub json: bool,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
66#[value(rename_all = "snake_case")]
67pub enum PendingStatusArg {
68 Validated,
69 EmbeddingInProgress,
70 EmbeddingDone,
71 Committed,
72 Abandoned,
73 Failed,
74}
75
76impl From<PendingStatusArg> for PendingStatus {
77 fn from(value: PendingStatusArg) -> Self {
78 match value {
79 PendingStatusArg::Validated => Self::Validated,
80 PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
81 PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
82 PendingStatusArg::Committed => Self::Committed,
83 PendingStatusArg::Abandoned => Self::Abandoned,
84 PendingStatusArg::Failed => Self::Failed,
85 }
86 }
87}
88
89#[derive(Debug, Args)]
90pub struct PendingShowArgs {
91 pub pending_id: i64,
93 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
97 pub db: Option<String>,
98 #[arg(long, hide = true)]
100 pub json: bool,
101}
102
103#[derive(Debug, Args)]
104pub struct PendingCleanupArgs {
105 #[arg(long, default_value_t = 86400)]
107 pub staged_cleanup_after: u64,
108 #[arg(long)]
110 pub yes: bool,
111 #[arg(long)]
113 pub dry_run: bool,
114 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
118 pub db: Option<String>,
119 #[arg(long, hide = true)]
121 pub json: bool,
122}
123
124#[derive(Serialize)]
125struct PendingListEntry {
126 pending_id: i64,
127 name: String,
128 namespace: String,
129 memory_type: String,
130 status: String,
131 attempt_count: i32,
132 last_error: Option<String>,
133 embedding_dim: Option<i32>,
134 created_at: i64,
135 updated_at: i64,
136}
137
138impl From<&PendingMemory> for PendingListEntry {
139 fn from(p: &PendingMemory) -> Self {
140 Self {
141 pending_id: p.pending_id,
142 name: p.name.clone(),
143 namespace: p.namespace.clone(),
144 memory_type: p.memory_type.clone(),
145 status: p.status.as_str().to_string(),
146 attempt_count: p.attempt_count,
147 last_error: p.last_error.clone(),
148 embedding_dim: p.embedding_dim,
149 created_at: p.created_at,
150 updated_at: p.updated_at,
151 }
152 }
153}
154
155#[derive(Serialize)]
156struct PendingListOutput {
157 action: &'static str,
158 filter_status: Option<String>,
159 count: usize,
160 entries: Vec<PendingListEntry>,
161 elapsed_ms: u64,
162}
163
164#[derive(Serialize)]
165struct PendingShowOutput {
166 action: &'static str,
167 entry: PendingMemory,
168 elapsed_ms: u64,
169}
170
171#[derive(Serialize)]
172struct PendingCleanupOutput {
173 action: &'static str,
174 dry_run: bool,
175 staged_cleanup_after_secs: u64,
176 candidates: usize,
177 removed: usize,
178 elapsed_ms: u64,
179 yes: bool,
180}
181
182pub fn run(args: PendingArgs) -> Result<(), AppError> {
183 match args.cmd {
184 PendingCmd::List(a) => run_list(a),
185 PendingCmd::Show(a) => run_show(a),
186 PendingCmd::Cleanup(a) => run_cleanup(a),
187 }
188}
189
190fn open_conn(db_override: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
191 let paths = AppPaths::resolve(db_override)?;
196 let conn = open_rw(&paths.db)?;
197 Ok((paths, conn))
198}
199
200fn run_list(args: PendingListArgs) -> Result<(), AppError> {
201 let start = std::time::Instant::now();
202 let (_paths, conn) = open_conn(args.db.as_deref())?;
203
204 let entries: Vec<PendingMemory> = if let Some(status) = args.status {
207 pending_memories::list_by_status(&conn, status.into(), args.limit)?
208 } else {
209 let mut all = Vec::new();
210 for status in [
211 PendingStatus::EmbeddingInProgress,
212 PendingStatus::EmbeddingDone,
213 PendingStatus::Validated,
214 PendingStatus::Abandoned,
215 PendingStatus::Failed,
216 ] {
217 let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
218 all.append(&mut bucket);
219 }
220 all.truncate(args.limit);
221 all
222 };
223
224 let count = entries.len();
225 let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
226 let output = PendingListOutput {
227 action: "pending_list",
228 filter_status: args.status.map(|s| {
229 match s {
230 PendingStatusArg::Validated => "validated",
231 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
232 PendingStatusArg::EmbeddingDone => "embedding_done",
233 PendingStatusArg::Committed => "committed",
234 PendingStatusArg::Abandoned => "abandoned",
235 PendingStatusArg::Failed => "failed",
236 }
237 .to_string()
238 }),
239 count,
240 entries: entries_out,
241 elapsed_ms: start.elapsed().as_millis() as u64,
242 };
243 emit_json_compact(&output)
244}
245
246fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
247 let start = std::time::Instant::now();
248 let (_paths, conn) = open_conn(args.db.as_deref())?;
249 let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
250 AppError::NotFound(format!(
251 "pending_id {} not found in pending_memories",
252 args.pending_id
253 ))
254 })?;
255 let output = PendingShowOutput {
256 action: "pending_show",
257 entry,
258 elapsed_ms: start.elapsed().as_millis() as u64,
259 };
260 emit_json_compact(&output)
261}
262
263fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
264 let start = std::time::Instant::now();
265 let (_paths, conn) = open_conn(args.db.as_deref())?;
266
267 let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
269 .into_iter()
270 .filter(|p| {
271 let now = chrono::Utc::now().timestamp();
272 now - p.updated_at >= args.staged_cleanup_after as i64
273 })
274 .count();
275
276 let removed = if args.dry_run {
277 0
278 } else {
279 pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
280 };
281
282 let output = PendingCleanupOutput {
283 action: if args.dry_run {
284 "pending_cleanup_dry_run"
285 } else {
286 "pending_cleanup"
287 },
288 dry_run: args.dry_run,
289 staged_cleanup_after_secs: args.staged_cleanup_after,
290 candidates,
291 removed,
292 elapsed_ms: start.elapsed().as_millis() as u64,
293 yes: args.yes,
294 };
295 emit_json_compact(&output)
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn status_arg_round_trip_all_variants() {
304 for arg in [
305 PendingStatusArg::Validated,
306 PendingStatusArg::EmbeddingInProgress,
307 PendingStatusArg::EmbeddingDone,
308 PendingStatusArg::Committed,
309 PendingStatusArg::Abandoned,
310 PendingStatusArg::Failed,
311 ] {
312 let status: PendingStatus = arg.into();
313 assert_eq!(status.as_str(), arg_to_str(arg));
314 }
315 }
316
317 fn arg_to_str(arg: PendingStatusArg) -> &'static str {
318 match arg {
319 PendingStatusArg::Validated => "validated",
320 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
321 PendingStatusArg::EmbeddingDone => "embedding_done",
322 PendingStatusArg::Committed => "committed",
323 PendingStatusArg::Abandoned => "abandoned",
324 PendingStatusArg::Failed => "failed",
325 }
326 }
327}