sqlite_graphrag/commands/
pending.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n \
25 # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n \
26 sqlite-graphrag pending list --status validated --json\n\n \
27 # Show the full record of pending_id 42\n \
28 sqlite-graphrag pending show 42 --json\n\n \
29 # Clean up entries abandoned for >24h (86400 seconds)\n \
30 sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32 #[command(subcommand)]
33 pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38 List(PendingListArgs),
40 Show(PendingShowArgs),
42 Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48 #[arg(long, value_enum)]
51 pub status: Option<PendingStatusArg>,
52 #[arg(long, default_value_t = 100)]
54 pub limit: usize,
55 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
59 pub db: Option<String>,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
63#[value(rename_all = "snake_case")]
64pub enum PendingStatusArg {
65 Validated,
66 EmbeddingInProgress,
67 EmbeddingDone,
68 Committed,
69 Abandoned,
70 Failed,
71}
72
73impl From<PendingStatusArg> for PendingStatus {
74 fn from(value: PendingStatusArg) -> Self {
75 match value {
76 PendingStatusArg::Validated => Self::Validated,
77 PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
78 PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
79 PendingStatusArg::Committed => Self::Committed,
80 PendingStatusArg::Abandoned => Self::Abandoned,
81 PendingStatusArg::Failed => Self::Failed,
82 }
83 }
84}
85
86#[derive(Debug, Args)]
87pub struct PendingShowArgs {
88 pub pending_id: i64,
90 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
94 pub db: Option<String>,
95}
96
97#[derive(Debug, Args)]
98pub struct PendingCleanupArgs {
99 #[arg(long, default_value_t = 86400)]
101 pub staged_cleanup_after: u64,
102 #[arg(long)]
104 pub yes: bool,
105 #[arg(long)]
107 pub dry_run: bool,
108}
109
110#[derive(Serialize)]
111struct PendingListEntry {
112 pending_id: i64,
113 name: String,
114 namespace: String,
115 memory_type: String,
116 status: String,
117 attempt_count: i32,
118 last_error: Option<String>,
119 embedding_dim: Option<i32>,
120 created_at: i64,
121 updated_at: i64,
122}
123
124impl From<&PendingMemory> for PendingListEntry {
125 fn from(p: &PendingMemory) -> Self {
126 Self {
127 pending_id: p.pending_id,
128 name: p.name.clone(),
129 namespace: p.namespace.clone(),
130 memory_type: p.memory_type.clone(),
131 status: p.status.as_str().to_string(),
132 attempt_count: p.attempt_count,
133 last_error: p.last_error.clone(),
134 embedding_dim: p.embedding_dim,
135 created_at: p.created_at,
136 updated_at: p.updated_at,
137 }
138 }
139}
140
141#[derive(Serialize)]
142struct PendingListOutput {
143 action: &'static str,
144 filter_status: Option<String>,
145 count: usize,
146 entries: Vec<PendingListEntry>,
147 elapsed_ms: u64,
148}
149
150#[derive(Serialize)]
151struct PendingShowOutput {
152 action: &'static str,
153 entry: PendingMemory,
154 elapsed_ms: u64,
155}
156
157#[derive(Serialize)]
158struct PendingCleanupOutput {
159 action: &'static str,
160 dry_run: bool,
161 staged_cleanup_after_secs: u64,
162 candidates: usize,
163 removed: usize,
164 elapsed_ms: u64,
165 yes: bool,
166}
167
168pub fn run(args: PendingArgs) -> Result<(), AppError> {
169 match args.cmd {
170 PendingCmd::List(a) => run_list(a),
171 PendingCmd::Show(a) => run_show(a),
172 PendingCmd::Cleanup(a) => run_cleanup(a),
173 }
174}
175
176fn open_conn(db_override: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
177 let paths = AppPaths::resolve(db_override)?;
182 let conn = open_rw(&paths.db)?;
183 Ok((paths, conn))
184}
185
186fn run_list(args: PendingListArgs) -> Result<(), AppError> {
187 let start = std::time::Instant::now();
188 let (_paths, conn) = open_conn(args.db.as_deref())?;
189
190 let entries: Vec<PendingMemory> = if let Some(status) = args.status {
193 pending_memories::list_by_status(&conn, status.into(), args.limit)?
194 } else {
195 let mut all = Vec::new();
196 for status in [
197 PendingStatus::EmbeddingInProgress,
198 PendingStatus::EmbeddingDone,
199 PendingStatus::Validated,
200 PendingStatus::Abandoned,
201 PendingStatus::Failed,
202 ] {
203 let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
204 all.append(&mut bucket);
205 }
206 all.truncate(args.limit);
207 all
208 };
209
210 let count = entries.len();
211 let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
212 let output = PendingListOutput {
213 action: "pending_list",
214 filter_status: args.status.map(|s| {
215 match s {
216 PendingStatusArg::Validated => "validated",
217 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
218 PendingStatusArg::EmbeddingDone => "embedding_done",
219 PendingStatusArg::Committed => "committed",
220 PendingStatusArg::Abandoned => "abandoned",
221 PendingStatusArg::Failed => "failed",
222 }
223 .to_string()
224 }),
225 count,
226 entries: entries_out,
227 elapsed_ms: start.elapsed().as_millis() as u64,
228 };
229 emit_json_compact(&output)
230}
231
232fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
233 let start = std::time::Instant::now();
234 let (_paths, conn) = open_conn(args.db.as_deref())?;
235 let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
236 AppError::NotFound(format!(
237 "pending_id {} not found in pending_memories",
238 args.pending_id
239 ))
240 })?;
241 let output = PendingShowOutput {
242 action: "pending_show",
243 entry,
244 elapsed_ms: start.elapsed().as_millis() as u64,
245 };
246 emit_json_compact(&output)
247}
248
249fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
250 let start = std::time::Instant::now();
251 let (_paths, conn) = open_conn(None)?;
252
253 let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
255 .into_iter()
256 .filter(|p| {
257 let now = chrono::Utc::now().timestamp();
258 now - p.updated_at >= args.staged_cleanup_after as i64
259 })
260 .count();
261
262 let removed = if args.dry_run {
263 0
264 } else {
265 pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
266 };
267
268 let output = PendingCleanupOutput {
269 action: if args.dry_run {
270 "pending_cleanup_dry_run"
271 } else {
272 "pending_cleanup"
273 },
274 dry_run: args.dry_run,
275 staged_cleanup_after_secs: args.staged_cleanup_after,
276 candidates,
277 removed,
278 elapsed_ms: start.elapsed().as_millis() as u64,
279 yes: args.yes,
280 };
281 emit_json_compact(&output)
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn status_arg_round_trip_all_variants() {
290 for arg in [
291 PendingStatusArg::Validated,
292 PendingStatusArg::EmbeddingInProgress,
293 PendingStatusArg::EmbeddingDone,
294 PendingStatusArg::Committed,
295 PendingStatusArg::Abandoned,
296 PendingStatusArg::Failed,
297 ] {
298 let status: PendingStatus = arg.into();
299 assert_eq!(status.as_str(), arg_to_str(arg));
300 }
301 }
302
303 fn arg_to_str(arg: PendingStatusArg) -> &'static str {
304 match arg {
305 PendingStatusArg::Validated => "validated",
306 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
307 PendingStatusArg::EmbeddingDone => "embedding_done",
308 PendingStatusArg::Committed => "committed",
309 PendingStatusArg::Abandoned => "abandoned",
310 PendingStatusArg::Failed => "failed",
311 }
312 }
313}