Skip to main content

sqlite_graphrag/commands/
pending.rs

1//! GAP-001 (v1.0.82): `pending` subcommand — inspect and manage the
2//! three-stage `remember` checkpoint queue persisted in `pending_memories`.
3//!
4//! ## Subcommands
5//! - `pending list [--status <STATUS>]` — show entries by status
6//! - `pending show <pending_id>` — show one entry in full
7//! - `pending cleanup --staged-cleanup-after <SECONDS>` — remove old abandoned
8//!
9//! The `pending` table is the durable footprint of the v1.0.82 staging pipeline
10//! (Stage A → B → C). When a host crashes between Stage B and Stage C the entry
11//! stays in `embedding_done` (or `embedding_in_progress`) and can be inspected
12//! or cleaned via this subcommand.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n  \
25    # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n  \
26    sqlite-graphrag pending list --status validated --json\n\n  \
27    # Show the full record of pending_id 42\n  \
28    sqlite-graphrag pending show 42 --json\n\n  \
29    # Clean up entries abandoned for >24h (86400 seconds)\n  \
30    sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32    #[command(subcommand)]
33    pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38    /// List entries by status (defaults to all non-committed).
39    List(PendingListArgs),
40    /// Show one entry in full (includes body, entities_json, embedding_dim).
41    Show(PendingShowArgs),
42    /// Remove entries older than `--staged-cleanup-after` seconds.
43    Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48    /// Filter by status: validated | embedding_in_progress | embedding_done |
49    /// committed | abandoned | failed. Default: all.
50    #[arg(long, value_enum)]
51    pub status: Option<PendingStatusArg>,
52    /// Maximum number of entries to return. Default: 100.
53    #[arg(long, default_value_t = 100)]
54    pub limit: usize,
55    /// GAP-E2E-010b (v1.0.89): explicit database path override. Defaults to
56    /// the path resolved by `AppPaths::resolve(None)` when omitted. Honors
57    /// env var `SQLITE_GRAPHRAG_DB_PATH`.
58    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
59    pub db: Option<String>,
60    /// JSON output (always on; accepted for CLI consistency).
61    #[arg(long, hide = true)]
62    pub json: bool,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
66#[value(rename_all = "snake_case")]
67pub enum PendingStatusArg {
68    Validated,
69    EmbeddingInProgress,
70    EmbeddingDone,
71    Committed,
72    Abandoned,
73    Failed,
74}
75
76impl From<PendingStatusArg> for PendingStatus {
77    fn from(value: PendingStatusArg) -> Self {
78        match value {
79            PendingStatusArg::Validated => Self::Validated,
80            PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
81            PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
82            PendingStatusArg::Committed => Self::Committed,
83            PendingStatusArg::Abandoned => Self::Abandoned,
84            PendingStatusArg::Failed => Self::Failed,
85        }
86    }
87}
88
89#[derive(Debug, Args)]
90pub struct PendingShowArgs {
91    /// Pending id returned by `remember --stage-only`.
92    pub pending_id: i64,
93    /// GAP-E2E-010b (v1.0.89): explicit database path override. Defaults to
94    /// the path resolved by `AppPaths::resolve(None)` when omitted. Honors
95    /// env var `SQLITE_GRAPHRAG_DB_PATH`.
96    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
97    pub db: Option<String>,
98    /// JSON output (always on; accepted for CLI consistency).
99    #[arg(long, hide = true)]
100    pub json: bool,
101}
102
103#[derive(Debug, Args)]
104pub struct PendingCleanupArgs {
105    /// Age in seconds after which an entry is eligible for cleanup.
106    #[arg(long, default_value_t = 86400)]
107    pub staged_cleanup_after: u64,
108    /// Skip the interactive confirmation prompt.
109    #[arg(long)]
110    pub yes: bool,
111    /// Dry-run: list what would be removed without touching the database.
112    #[arg(long)]
113    pub dry_run: bool,
114    /// JSON output (always on; accepted for CLI consistency).
115    #[arg(long, hide = true)]
116    pub json: bool,
117}
118
119#[derive(Serialize)]
120struct PendingListEntry {
121    pending_id: i64,
122    name: String,
123    namespace: String,
124    memory_type: String,
125    status: String,
126    attempt_count: i32,
127    last_error: Option<String>,
128    embedding_dim: Option<i32>,
129    created_at: i64,
130    updated_at: i64,
131}
132
133impl From<&PendingMemory> for PendingListEntry {
134    fn from(p: &PendingMemory) -> Self {
135        Self {
136            pending_id: p.pending_id,
137            name: p.name.clone(),
138            namespace: p.namespace.clone(),
139            memory_type: p.memory_type.clone(),
140            status: p.status.as_str().to_string(),
141            attempt_count: p.attempt_count,
142            last_error: p.last_error.clone(),
143            embedding_dim: p.embedding_dim,
144            created_at: p.created_at,
145            updated_at: p.updated_at,
146        }
147    }
148}
149
150#[derive(Serialize)]
151struct PendingListOutput {
152    action: &'static str,
153    filter_status: Option<String>,
154    count: usize,
155    entries: Vec<PendingListEntry>,
156    elapsed_ms: u64,
157}
158
159#[derive(Serialize)]
160struct PendingShowOutput {
161    action: &'static str,
162    entry: PendingMemory,
163    elapsed_ms: u64,
164}
165
166#[derive(Serialize)]
167struct PendingCleanupOutput {
168    action: &'static str,
169    dry_run: bool,
170    staged_cleanup_after_secs: u64,
171    candidates: usize,
172    removed: usize,
173    elapsed_ms: u64,
174    yes: bool,
175}
176
177pub fn run(args: PendingArgs) -> Result<(), AppError> {
178    match args.cmd {
179        PendingCmd::List(a) => run_list(a),
180        PendingCmd::Show(a) => run_show(a),
181        PendingCmd::Cleanup(a) => run_cleanup(a),
182    }
183}
184
185fn open_conn(db_override: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
186    // GAP-E2E-010b (v1.0.89): honor `--db <PATH>` for parity with the
187    // rest of the CLI surface. `AppPaths::resolve` accepts the same value
188    // passed by callers of other subcommands, keeping path semantics
189    // consistent across the entire command surface.
190    let paths = AppPaths::resolve(db_override)?;
191    let conn = open_rw(&paths.db)?;
192    Ok((paths, conn))
193}
194
195fn run_list(args: PendingListArgs) -> Result<(), AppError> {
196    let start = std::time::Instant::now();
197    let (_paths, conn) = open_conn(args.db.as_deref())?;
198
199    // If a status filter was provided, query that single status. Otherwise return
200    // all six buckets so the operator can see the full staging landscape.
201    let entries: Vec<PendingMemory> = if let Some(status) = args.status {
202        pending_memories::list_by_status(&conn, status.into(), args.limit)?
203    } else {
204        let mut all = Vec::new();
205        for status in [
206            PendingStatus::EmbeddingInProgress,
207            PendingStatus::EmbeddingDone,
208            PendingStatus::Validated,
209            PendingStatus::Abandoned,
210            PendingStatus::Failed,
211        ] {
212            let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
213            all.append(&mut bucket);
214        }
215        all.truncate(args.limit);
216        all
217    };
218
219    let count = entries.len();
220    let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
221    let output = PendingListOutput {
222        action: "pending_list",
223        filter_status: args.status.map(|s| {
224            match s {
225                PendingStatusArg::Validated => "validated",
226                PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
227                PendingStatusArg::EmbeddingDone => "embedding_done",
228                PendingStatusArg::Committed => "committed",
229                PendingStatusArg::Abandoned => "abandoned",
230                PendingStatusArg::Failed => "failed",
231            }
232            .to_string()
233        }),
234        count,
235        entries: entries_out,
236        elapsed_ms: start.elapsed().as_millis() as u64,
237    };
238    emit_json_compact(&output)
239}
240
241fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
242    let start = std::time::Instant::now();
243    let (_paths, conn) = open_conn(args.db.as_deref())?;
244    let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
245        AppError::NotFound(format!(
246            "pending_id {} not found in pending_memories",
247            args.pending_id
248        ))
249    })?;
250    let output = PendingShowOutput {
251        action: "pending_show",
252        entry,
253        elapsed_ms: start.elapsed().as_millis() as u64,
254    };
255    emit_json_compact(&output)
256}
257
258fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
259    let start = std::time::Instant::now();
260    let (_paths, conn) = open_conn(None)?;
261
262    // Count candidates first so dry-run is non-mutating.
263    let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
264        .into_iter()
265        .filter(|p| {
266            let now = chrono::Utc::now().timestamp();
267            now - p.updated_at >= args.staged_cleanup_after as i64
268        })
269        .count();
270
271    let removed = if args.dry_run {
272        0
273    } else {
274        pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
275    };
276
277    let output = PendingCleanupOutput {
278        action: if args.dry_run {
279            "pending_cleanup_dry_run"
280        } else {
281            "pending_cleanup"
282        },
283        dry_run: args.dry_run,
284        staged_cleanup_after_secs: args.staged_cleanup_after,
285        candidates,
286        removed,
287        elapsed_ms: start.elapsed().as_millis() as u64,
288        yes: args.yes,
289    };
290    emit_json_compact(&output)
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[test]
298    fn status_arg_round_trip_all_variants() {
299        for arg in [
300            PendingStatusArg::Validated,
301            PendingStatusArg::EmbeddingInProgress,
302            PendingStatusArg::EmbeddingDone,
303            PendingStatusArg::Committed,
304            PendingStatusArg::Abandoned,
305            PendingStatusArg::Failed,
306        ] {
307            let status: PendingStatus = arg.into();
308            assert_eq!(status.as_str(), arg_to_str(arg));
309        }
310    }
311
312    fn arg_to_str(arg: PendingStatusArg) -> &'static str {
313        match arg {
314            PendingStatusArg::Validated => "validated",
315            PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
316            PendingStatusArg::EmbeddingDone => "embedding_done",
317            PendingStatusArg::Committed => "committed",
318            PendingStatusArg::Abandoned => "abandoned",
319            PendingStatusArg::Failed => "failed",
320        }
321    }
322}