sqlite_graphrag/commands/
pending.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n \
25 # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n \
26 sqlite-graphrag pending list --status validated --json\n\n \
27 # Show the full record of pending_id 42\n \
28 sqlite-graphrag pending show 42 --json\n\n \
29 # Clean up entries abandoned for >24h (86400 seconds)\n \
30 sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32 #[command(subcommand)]
33 pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38 List(PendingListArgs),
40 Show(PendingShowArgs),
42 Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48 #[arg(long, value_enum)]
51 pub status: Option<PendingStatusArg>,
52 #[arg(long, default_value_t = 100)]
54 pub limit: usize,
55 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
59 pub db: Option<String>,
60 #[arg(long, hide = true)]
62 pub json: bool,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
66#[value(rename_all = "snake_case")]
67pub enum PendingStatusArg {
68 Validated,
69 EmbeddingInProgress,
70 EmbeddingDone,
71 Committed,
72 Abandoned,
73 Failed,
74}
75
76impl From<PendingStatusArg> for PendingStatus {
77 fn from(value: PendingStatusArg) -> Self {
78 match value {
79 PendingStatusArg::Validated => Self::Validated,
80 PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
81 PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
82 PendingStatusArg::Committed => Self::Committed,
83 PendingStatusArg::Abandoned => Self::Abandoned,
84 PendingStatusArg::Failed => Self::Failed,
85 }
86 }
87}
88
89#[derive(Debug, Args)]
90pub struct PendingShowArgs {
91 pub pending_id: i64,
93 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
97 pub db: Option<String>,
98 #[arg(long, hide = true)]
100 pub json: bool,
101}
102
103#[derive(Debug, Args)]
104pub struct PendingCleanupArgs {
105 #[arg(long, default_value_t = 86400)]
107 pub staged_cleanup_after: u64,
108 #[arg(long)]
110 pub yes: bool,
111 #[arg(long)]
113 pub dry_run: bool,
114 #[arg(long, hide = true)]
116 pub json: bool,
117}
118
119#[derive(Serialize)]
120struct PendingListEntry {
121 pending_id: i64,
122 name: String,
123 namespace: String,
124 memory_type: String,
125 status: String,
126 attempt_count: i32,
127 last_error: Option<String>,
128 embedding_dim: Option<i32>,
129 created_at: i64,
130 updated_at: i64,
131}
132
133impl From<&PendingMemory> for PendingListEntry {
134 fn from(p: &PendingMemory) -> Self {
135 Self {
136 pending_id: p.pending_id,
137 name: p.name.clone(),
138 namespace: p.namespace.clone(),
139 memory_type: p.memory_type.clone(),
140 status: p.status.as_str().to_string(),
141 attempt_count: p.attempt_count,
142 last_error: p.last_error.clone(),
143 embedding_dim: p.embedding_dim,
144 created_at: p.created_at,
145 updated_at: p.updated_at,
146 }
147 }
148}
149
150#[derive(Serialize)]
151struct PendingListOutput {
152 action: &'static str,
153 filter_status: Option<String>,
154 count: usize,
155 entries: Vec<PendingListEntry>,
156 elapsed_ms: u64,
157}
158
159#[derive(Serialize)]
160struct PendingShowOutput {
161 action: &'static str,
162 entry: PendingMemory,
163 elapsed_ms: u64,
164}
165
166#[derive(Serialize)]
167struct PendingCleanupOutput {
168 action: &'static str,
169 dry_run: bool,
170 staged_cleanup_after_secs: u64,
171 candidates: usize,
172 removed: usize,
173 elapsed_ms: u64,
174 yes: bool,
175}
176
177pub fn run(args: PendingArgs) -> Result<(), AppError> {
178 match args.cmd {
179 PendingCmd::List(a) => run_list(a),
180 PendingCmd::Show(a) => run_show(a),
181 PendingCmd::Cleanup(a) => run_cleanup(a),
182 }
183}
184
185fn open_conn(db_override: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
186 let paths = AppPaths::resolve(db_override)?;
191 let conn = open_rw(&paths.db)?;
192 Ok((paths, conn))
193}
194
195fn run_list(args: PendingListArgs) -> Result<(), AppError> {
196 let start = std::time::Instant::now();
197 let (_paths, conn) = open_conn(args.db.as_deref())?;
198
199 let entries: Vec<PendingMemory> = if let Some(status) = args.status {
202 pending_memories::list_by_status(&conn, status.into(), args.limit)?
203 } else {
204 let mut all = Vec::new();
205 for status in [
206 PendingStatus::EmbeddingInProgress,
207 PendingStatus::EmbeddingDone,
208 PendingStatus::Validated,
209 PendingStatus::Abandoned,
210 PendingStatus::Failed,
211 ] {
212 let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
213 all.append(&mut bucket);
214 }
215 all.truncate(args.limit);
216 all
217 };
218
219 let count = entries.len();
220 let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
221 let output = PendingListOutput {
222 action: "pending_list",
223 filter_status: args.status.map(|s| {
224 match s {
225 PendingStatusArg::Validated => "validated",
226 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
227 PendingStatusArg::EmbeddingDone => "embedding_done",
228 PendingStatusArg::Committed => "committed",
229 PendingStatusArg::Abandoned => "abandoned",
230 PendingStatusArg::Failed => "failed",
231 }
232 .to_string()
233 }),
234 count,
235 entries: entries_out,
236 elapsed_ms: start.elapsed().as_millis() as u64,
237 };
238 emit_json_compact(&output)
239}
240
241fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
242 let start = std::time::Instant::now();
243 let (_paths, conn) = open_conn(args.db.as_deref())?;
244 let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
245 AppError::NotFound(format!(
246 "pending_id {} not found in pending_memories",
247 args.pending_id
248 ))
249 })?;
250 let output = PendingShowOutput {
251 action: "pending_show",
252 entry,
253 elapsed_ms: start.elapsed().as_millis() as u64,
254 };
255 emit_json_compact(&output)
256}
257
258fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
259 let start = std::time::Instant::now();
260 let (_paths, conn) = open_conn(None)?;
261
262 let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
264 .into_iter()
265 .filter(|p| {
266 let now = chrono::Utc::now().timestamp();
267 now - p.updated_at >= args.staged_cleanup_after as i64
268 })
269 .count();
270
271 let removed = if args.dry_run {
272 0
273 } else {
274 pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
275 };
276
277 let output = PendingCleanupOutput {
278 action: if args.dry_run {
279 "pending_cleanup_dry_run"
280 } else {
281 "pending_cleanup"
282 },
283 dry_run: args.dry_run,
284 staged_cleanup_after_secs: args.staged_cleanup_after,
285 candidates,
286 removed,
287 elapsed_ms: start.elapsed().as_millis() as u64,
288 yes: args.yes,
289 };
290 emit_json_compact(&output)
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[test]
298 fn status_arg_round_trip_all_variants() {
299 for arg in [
300 PendingStatusArg::Validated,
301 PendingStatusArg::EmbeddingInProgress,
302 PendingStatusArg::EmbeddingDone,
303 PendingStatusArg::Committed,
304 PendingStatusArg::Abandoned,
305 PendingStatusArg::Failed,
306 ] {
307 let status: PendingStatus = arg.into();
308 assert_eq!(status.as_str(), arg_to_str(arg));
309 }
310 }
311
312 fn arg_to_str(arg: PendingStatusArg) -> &'static str {
313 match arg {
314 PendingStatusArg::Validated => "validated",
315 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
316 PendingStatusArg::EmbeddingDone => "embedding_done",
317 PendingStatusArg::Committed => "committed",
318 PendingStatusArg::Abandoned => "abandoned",
319 PendingStatusArg::Failed => "failed",
320 }
321 }
322}