sqlite_graphrag/commands/
pending.rs1use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n \
25 # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n \
26 sqlite-graphrag pending list --status validated --json\n\n \
27 # Show the full record of pending_id 42\n \
28 sqlite-graphrag pending show 42 --json\n\n \
29 # Clean up entries abandoned for >24h (86400 seconds)\n \
30 sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32 #[command(subcommand)]
33 pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38 List(PendingListArgs),
40 Show(PendingShowArgs),
42 Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48 #[arg(long, value_enum)]
51 pub status: Option<PendingStatusArg>,
52 #[arg(long, default_value_t = 100)]
54 pub limit: usize,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
58#[value(rename_all = "snake_case")]
59pub enum PendingStatusArg {
60 Validated,
61 EmbeddingInProgress,
62 EmbeddingDone,
63 Committed,
64 Abandoned,
65 Failed,
66}
67
68impl From<PendingStatusArg> for PendingStatus {
69 fn from(value: PendingStatusArg) -> Self {
70 match value {
71 PendingStatusArg::Validated => Self::Validated,
72 PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
73 PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
74 PendingStatusArg::Committed => Self::Committed,
75 PendingStatusArg::Abandoned => Self::Abandoned,
76 PendingStatusArg::Failed => Self::Failed,
77 }
78 }
79}
80
81#[derive(Debug, Args)]
82pub struct PendingShowArgs {
83 pub pending_id: i64,
85}
86
87#[derive(Debug, Args)]
88pub struct PendingCleanupArgs {
89 #[arg(long, default_value_t = 86400)]
91 pub staged_cleanup_after: u64,
92 #[arg(long)]
94 pub yes: bool,
95 #[arg(long)]
97 pub dry_run: bool,
98}
99
100#[derive(Serialize)]
101struct PendingListEntry {
102 pending_id: i64,
103 name: String,
104 namespace: String,
105 memory_type: String,
106 status: String,
107 attempt_count: i32,
108 last_error: Option<String>,
109 embedding_dim: Option<i32>,
110 created_at: i64,
111 updated_at: i64,
112}
113
114impl From<&PendingMemory> for PendingListEntry {
115 fn from(p: &PendingMemory) -> Self {
116 Self {
117 pending_id: p.pending_id,
118 name: p.name.clone(),
119 namespace: p.namespace.clone(),
120 memory_type: p.memory_type.clone(),
121 status: p.status.as_str().to_string(),
122 attempt_count: p.attempt_count,
123 last_error: p.last_error.clone(),
124 embedding_dim: p.embedding_dim,
125 created_at: p.created_at,
126 updated_at: p.updated_at,
127 }
128 }
129}
130
131#[derive(Serialize)]
132struct PendingListOutput {
133 action: &'static str,
134 filter_status: Option<String>,
135 count: usize,
136 entries: Vec<PendingListEntry>,
137 elapsed_ms: u64,
138}
139
140#[derive(Serialize)]
141struct PendingShowOutput {
142 action: &'static str,
143 entry: PendingMemory,
144 elapsed_ms: u64,
145}
146
147#[derive(Serialize)]
148struct PendingCleanupOutput {
149 action: &'static str,
150 dry_run: bool,
151 staged_cleanup_after_secs: u64,
152 candidates: usize,
153 removed: usize,
154 elapsed_ms: u64,
155 yes: bool,
156}
157
158pub fn run(args: PendingArgs) -> Result<(), AppError> {
159 match args.cmd {
160 PendingCmd::List(a) => run_list(a),
161 PendingCmd::Show(a) => run_show(a),
162 PendingCmd::Cleanup(a) => run_cleanup(a),
163 }
164}
165
166fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
167 let paths = AppPaths::resolve(None)?;
168 let conn = open_rw(&paths.db)?;
169 Ok((paths, conn))
170}
171
172fn run_list(args: PendingListArgs) -> Result<(), AppError> {
173 let start = std::time::Instant::now();
174 let (_paths, conn) = open_conn()?;
175
176 let entries: Vec<PendingMemory> = if let Some(status) = args.status {
179 pending_memories::list_by_status(&conn, status.into(), args.limit)?
180 } else {
181 let mut all = Vec::new();
182 for status in [
183 PendingStatus::EmbeddingInProgress,
184 PendingStatus::EmbeddingDone,
185 PendingStatus::Validated,
186 PendingStatus::Abandoned,
187 PendingStatus::Failed,
188 ] {
189 let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
190 all.append(&mut bucket);
191 }
192 all.truncate(args.limit);
193 all
194 };
195
196 let count = entries.len();
197 let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
198 let output = PendingListOutput {
199 action: "pending_list",
200 filter_status: args.status.map(|s| {
201 match s {
202 PendingStatusArg::Validated => "validated",
203 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
204 PendingStatusArg::EmbeddingDone => "embedding_done",
205 PendingStatusArg::Committed => "committed",
206 PendingStatusArg::Abandoned => "abandoned",
207 PendingStatusArg::Failed => "failed",
208 }
209 .to_string()
210 }),
211 count,
212 entries: entries_out,
213 elapsed_ms: start.elapsed().as_millis() as u64,
214 };
215 emit_json_compact(&output)
216}
217
218fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
219 let start = std::time::Instant::now();
220 let (_paths, conn) = open_conn()?;
221 let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
222 AppError::NotFound(format!(
223 "pending_id {} not found in pending_memories",
224 args.pending_id
225 ))
226 })?;
227 let output = PendingShowOutput {
228 action: "pending_show",
229 entry,
230 elapsed_ms: start.elapsed().as_millis() as u64,
231 };
232 emit_json_compact(&output)
233}
234
235fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
236 let start = std::time::Instant::now();
237 let (_paths, conn) = open_conn()?;
238
239 let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
241 .into_iter()
242 .filter(|p| {
243 let now = chrono::Utc::now().timestamp();
244 now - p.updated_at >= args.staged_cleanup_after as i64
245 })
246 .count();
247
248 let removed = if args.dry_run {
249 0
250 } else {
251 pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
252 };
253
254 let output = PendingCleanupOutput {
255 action: if args.dry_run {
256 "pending_cleanup_dry_run"
257 } else {
258 "pending_cleanup"
259 },
260 dry_run: args.dry_run,
261 staged_cleanup_after_secs: args.staged_cleanup_after,
262 candidates,
263 removed,
264 elapsed_ms: start.elapsed().as_millis() as u64,
265 yes: args.yes,
266 };
267 emit_json_compact(&output)
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn status_arg_round_trip_all_variants() {
276 for arg in [
277 PendingStatusArg::Validated,
278 PendingStatusArg::EmbeddingInProgress,
279 PendingStatusArg::EmbeddingDone,
280 PendingStatusArg::Committed,
281 PendingStatusArg::Abandoned,
282 PendingStatusArg::Failed,
283 ] {
284 let status: PendingStatus = arg.into();
285 assert_eq!(status.as_str(), arg_to_str(arg));
286 }
287 }
288
289 fn arg_to_str(arg: PendingStatusArg) -> &'static str {
290 match arg {
291 PendingStatusArg::Validated => "validated",
292 PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
293 PendingStatusArg::EmbeddingDone => "embedding_done",
294 PendingStatusArg::Committed => "committed",
295 PendingStatusArg::Abandoned => "abandoned",
296 PendingStatusArg::Failed => "failed",
297 }
298 }
299}