Skip to main content

sqlite_graphrag/commands/
pending.rs

1//! GAP-001 (v1.0.82): `pending` subcommand — inspect and manage the
2//! three-stage `remember` checkpoint queue persisted in `pending_memories`.
3//!
4//! ## Subcommands
5//! - `pending list [--status <STATUS>]` — show entries by status
6//! - `pending show <pending_id>` — show one entry in full
7//! - `pending cleanup --staged-cleanup-after <SECONDS>` — remove old abandoned
8//!
9//! The `pending` table is the durable footprint of the v1.0.82 staging pipeline
10//! (Stage A → B → C). When a host crashes between Stage B and Stage C the entry
11//! stays in `embedding_done` (or `embedding_in_progress`) and can be inspected
12//! or cleaned via this subcommand.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n  \
25    # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n  \
26    sqlite-graphrag pending list --status validated --json\n\n  \
27    # Show the full record of pending_id 42\n  \
28    sqlite-graphrag pending show 42 --json\n\n  \
29    # Clean up entries abandoned for >24h (86400 seconds)\n  \
30    sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32    #[command(subcommand)]
33    pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38    /// List entries by status (defaults to all non-committed).
39    List(PendingListArgs),
40    /// Show one entry in full (includes body, entities_json, embedding_dim).
41    Show(PendingShowArgs),
42    /// Remove entries older than `--staged-cleanup-after` seconds.
43    Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48    /// Filter by status: validated | embedding_in_progress | embedding_done |
49    /// committed | abandoned | failed. Default: all.
50    #[arg(long, value_enum)]
51    pub status: Option<PendingStatusArg>,
52    /// Maximum number of entries to return. Default: 100.
53    #[arg(long, default_value_t = 100)]
54    pub limit: usize,
55    /// GAP-E2E-010b (v1.0.89): explicit database path override. Defaults to
56    /// the path resolved by `AppPaths::resolve(None)` when omitted. Honors
57    /// env var `SQLITE_GRAPHRAG_DB_PATH`.
58    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
59    pub db: Option<String>,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
63#[value(rename_all = "snake_case")]
64pub enum PendingStatusArg {
65    Validated,
66    EmbeddingInProgress,
67    EmbeddingDone,
68    Committed,
69    Abandoned,
70    Failed,
71}
72
73impl From<PendingStatusArg> for PendingStatus {
74    fn from(value: PendingStatusArg) -> Self {
75        match value {
76            PendingStatusArg::Validated => Self::Validated,
77            PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
78            PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
79            PendingStatusArg::Committed => Self::Committed,
80            PendingStatusArg::Abandoned => Self::Abandoned,
81            PendingStatusArg::Failed => Self::Failed,
82        }
83    }
84}
85
86#[derive(Debug, Args)]
87pub struct PendingShowArgs {
88    /// Pending id returned by `remember --stage-only`.
89    pub pending_id: i64,
90    /// GAP-E2E-010b (v1.0.89): explicit database path override. Defaults to
91    /// the path resolved by `AppPaths::resolve(None)` when omitted. Honors
92    /// env var `SQLITE_GRAPHRAG_DB_PATH`.
93    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
94    pub db: Option<String>,
95}
96
97#[derive(Debug, Args)]
98pub struct PendingCleanupArgs {
99    /// Age in seconds after which an entry is eligible for cleanup.
100    #[arg(long, default_value_t = 86400)]
101    pub staged_cleanup_after: u64,
102    /// Skip the interactive confirmation prompt.
103    #[arg(long)]
104    pub yes: bool,
105    /// Dry-run: list what would be removed without touching the database.
106    #[arg(long)]
107    pub dry_run: bool,
108}
109
110#[derive(Serialize)]
111struct PendingListEntry {
112    pending_id: i64,
113    name: String,
114    namespace: String,
115    memory_type: String,
116    status: String,
117    attempt_count: i32,
118    last_error: Option<String>,
119    embedding_dim: Option<i32>,
120    created_at: i64,
121    updated_at: i64,
122}
123
124impl From<&PendingMemory> for PendingListEntry {
125    fn from(p: &PendingMemory) -> Self {
126        Self {
127            pending_id: p.pending_id,
128            name: p.name.clone(),
129            namespace: p.namespace.clone(),
130            memory_type: p.memory_type.clone(),
131            status: p.status.as_str().to_string(),
132            attempt_count: p.attempt_count,
133            last_error: p.last_error.clone(),
134            embedding_dim: p.embedding_dim,
135            created_at: p.created_at,
136            updated_at: p.updated_at,
137        }
138    }
139}
140
141#[derive(Serialize)]
142struct PendingListOutput {
143    action: &'static str,
144    filter_status: Option<String>,
145    count: usize,
146    entries: Vec<PendingListEntry>,
147    elapsed_ms: u64,
148}
149
150#[derive(Serialize)]
151struct PendingShowOutput {
152    action: &'static str,
153    entry: PendingMemory,
154    elapsed_ms: u64,
155}
156
157#[derive(Serialize)]
158struct PendingCleanupOutput {
159    action: &'static str,
160    dry_run: bool,
161    staged_cleanup_after_secs: u64,
162    candidates: usize,
163    removed: usize,
164    elapsed_ms: u64,
165    yes: bool,
166}
167
168pub fn run(args: PendingArgs) -> Result<(), AppError> {
169    match args.cmd {
170        PendingCmd::List(a) => run_list(a),
171        PendingCmd::Show(a) => run_show(a),
172        PendingCmd::Cleanup(a) => run_cleanup(a),
173    }
174}
175
176fn open_conn(db_override: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
177    // GAP-E2E-010b (v1.0.89): honor `--db <PATH>` for parity with the
178    // rest of the CLI surface. `AppPaths::resolve` accepts the same value
179    // passed by callers of other subcommands, keeping path semantics
180    // consistent across the entire command surface.
181    let paths = AppPaths::resolve(db_override)?;
182    let conn = open_rw(&paths.db)?;
183    Ok((paths, conn))
184}
185
186fn run_list(args: PendingListArgs) -> Result<(), AppError> {
187    let start = std::time::Instant::now();
188    let (_paths, conn) = open_conn(args.db.as_deref())?;
189
190    // If a status filter was provided, query that single status. Otherwise return
191    // all six buckets so the operator can see the full staging landscape.
192    let entries: Vec<PendingMemory> = if let Some(status) = args.status {
193        pending_memories::list_by_status(&conn, status.into(), args.limit)?
194    } else {
195        let mut all = Vec::new();
196        for status in [
197            PendingStatus::EmbeddingInProgress,
198            PendingStatus::EmbeddingDone,
199            PendingStatus::Validated,
200            PendingStatus::Abandoned,
201            PendingStatus::Failed,
202        ] {
203            let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
204            all.append(&mut bucket);
205        }
206        all.truncate(args.limit);
207        all
208    };
209
210    let count = entries.len();
211    let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
212    let output = PendingListOutput {
213        action: "pending_list",
214        filter_status: args.status.map(|s| {
215            match s {
216                PendingStatusArg::Validated => "validated",
217                PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
218                PendingStatusArg::EmbeddingDone => "embedding_done",
219                PendingStatusArg::Committed => "committed",
220                PendingStatusArg::Abandoned => "abandoned",
221                PendingStatusArg::Failed => "failed",
222            }
223            .to_string()
224        }),
225        count,
226        entries: entries_out,
227        elapsed_ms: start.elapsed().as_millis() as u64,
228    };
229    emit_json_compact(&output)
230}
231
232fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
233    let start = std::time::Instant::now();
234    let (_paths, conn) = open_conn(args.db.as_deref())?;
235    let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
236        AppError::NotFound(format!(
237            "pending_id {} not found in pending_memories",
238            args.pending_id
239        ))
240    })?;
241    let output = PendingShowOutput {
242        action: "pending_show",
243        entry,
244        elapsed_ms: start.elapsed().as_millis() as u64,
245    };
246    emit_json_compact(&output)
247}
248
249fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
250    let start = std::time::Instant::now();
251    let (_paths, conn) = open_conn(None)?;
252
253    // Count candidates first so dry-run is non-mutating.
254    let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
255        .into_iter()
256        .filter(|p| {
257            let now = chrono::Utc::now().timestamp();
258            now - p.updated_at >= args.staged_cleanup_after as i64
259        })
260        .count();
261
262    let removed = if args.dry_run {
263        0
264    } else {
265        pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
266    };
267
268    let output = PendingCleanupOutput {
269        action: if args.dry_run {
270            "pending_cleanup_dry_run"
271        } else {
272            "pending_cleanup"
273        },
274        dry_run: args.dry_run,
275        staged_cleanup_after_secs: args.staged_cleanup_after,
276        candidates,
277        removed,
278        elapsed_ms: start.elapsed().as_millis() as u64,
279        yes: args.yes,
280    };
281    emit_json_compact(&output)
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn status_arg_round_trip_all_variants() {
290        for arg in [
291            PendingStatusArg::Validated,
292            PendingStatusArg::EmbeddingInProgress,
293            PendingStatusArg::EmbeddingDone,
294            PendingStatusArg::Committed,
295            PendingStatusArg::Abandoned,
296            PendingStatusArg::Failed,
297        ] {
298            let status: PendingStatus = arg.into();
299            assert_eq!(status.as_str(), arg_to_str(arg));
300        }
301    }
302
303    fn arg_to_str(arg: PendingStatusArg) -> &'static str {
304        match arg {
305            PendingStatusArg::Validated => "validated",
306            PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
307            PendingStatusArg::EmbeddingDone => "embedding_done",
308            PendingStatusArg::Committed => "committed",
309            PendingStatusArg::Abandoned => "abandoned",
310            PendingStatusArg::Failed => "failed",
311        }
312    }
313}