Skip to main content

sqlite_graphrag/commands/
pending.rs

1//! GAP-001 (v1.0.82): `pending` subcommand — inspect and manage the
2//! three-stage `remember` checkpoint queue persisted in `pending_memories`.
3//!
4//! ## Subcommands
5//! - `pending list [--status <STATUS>]` — show entries by status
6//! - `pending show <pending_id>` — show one entry in full
7//! - `pending cleanup --staged-cleanup-after <SECONDS>` — remove old abandoned
8//!
9//! The `pending` table is the durable footprint of the v1.0.82 staging pipeline
10//! (Stage A → B → C). When a host crashes between Stage B and Stage C the entry
11//! stays in `embedding_done` (or `embedding_in_progress`) and can be inspected
12//! or cleaned via this subcommand.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n  \
25    # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n  \
26    sqlite-graphrag pending list --status validated --json\n\n  \
27    # Show the full record of pending_id 42\n  \
28    sqlite-graphrag pending show 42 --json\n\n  \
29    # Clean up entries abandoned for >24h (86400 seconds)\n  \
30    sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32    #[command(subcommand)]
33    pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38    /// List entries by status (defaults to all non-committed).
39    List(PendingListArgs),
40    /// Show one entry in full (includes body, entities_json, embedding_dim).
41    Show(PendingShowArgs),
42    /// Remove entries older than `--staged-cleanup-after` seconds.
43    Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48    /// Filter by status: validated | embedding_in_progress | embedding_done |
49    /// committed | abandoned | failed. Default: all.
50    #[arg(long, value_enum)]
51    pub status: Option<PendingStatusArg>,
52    /// Maximum number of entries to return. Default: 100.
53    #[arg(long, default_value_t = 100)]
54    pub limit: usize,
55    /// GAP-E2E-010b (v1.0.89): explicit database path override. Defaults to
56    /// the path resolved by `AppPaths::resolve(None)` when omitted. Honors
57    /// env var `SQLITE_GRAPHRAG_DB_PATH`.
58    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
59    pub db: Option<String>,
60    /// JSON output (always on; accepted for CLI consistency).
61    #[arg(long, hide = true)]
62    pub json: bool,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
66#[value(rename_all = "snake_case")]
67pub enum PendingStatusArg {
68    Validated,
69    EmbeddingInProgress,
70    EmbeddingDone,
71    Committed,
72    Abandoned,
73    Failed,
74}
75
76impl From<PendingStatusArg> for PendingStatus {
77    fn from(value: PendingStatusArg) -> Self {
78        match value {
79            PendingStatusArg::Validated => Self::Validated,
80            PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
81            PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
82            PendingStatusArg::Committed => Self::Committed,
83            PendingStatusArg::Abandoned => Self::Abandoned,
84            PendingStatusArg::Failed => Self::Failed,
85        }
86    }
87}
88
89#[derive(Debug, Args)]
90pub struct PendingShowArgs {
91    /// Pending id returned by `remember --stage-only`.
92    pub pending_id: i64,
93    /// GAP-E2E-010b (v1.0.89): explicit database path override. Defaults to
94    /// the path resolved by `AppPaths::resolve(None)` when omitted. Honors
95    /// env var `SQLITE_GRAPHRAG_DB_PATH`.
96    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
97    pub db: Option<String>,
98    /// JSON output (always on; accepted for CLI consistency).
99    #[arg(long, hide = true)]
100    pub json: bool,
101}
102
103#[derive(Debug, Args)]
104pub struct PendingCleanupArgs {
105    /// Age in seconds after which an entry is eligible for cleanup.
106    #[arg(long, default_value_t = 86400)]
107    pub staged_cleanup_after: u64,
108    /// Skip the interactive confirmation prompt.
109    #[arg(long)]
110    pub yes: bool,
111    /// Dry-run: list what would be removed without touching the database.
112    #[arg(long)]
113    pub dry_run: bool,
114    /// Explicit database path override. Defaults to the path resolved by
115    /// `AppPaths::resolve(None)` when omitted. Honors env var
116    /// `SQLITE_GRAPHRAG_DB_PATH`.
117    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
118    pub db: Option<String>,
119    /// JSON output (always on; accepted for CLI consistency).
120    #[arg(long, hide = true)]
121    pub json: bool,
122}
123
124#[derive(Serialize)]
125struct PendingListEntry {
126    pending_id: i64,
127    name: String,
128    namespace: String,
129    memory_type: String,
130    status: String,
131    attempt_count: i32,
132    last_error: Option<String>,
133    embedding_dim: Option<i32>,
134    created_at: i64,
135    updated_at: i64,
136}
137
138impl From<&PendingMemory> for PendingListEntry {
139    fn from(p: &PendingMemory) -> Self {
140        Self {
141            pending_id: p.pending_id,
142            name: p.name.clone(),
143            namespace: p.namespace.clone(),
144            memory_type: p.memory_type.clone(),
145            status: p.status.as_str().to_string(),
146            attempt_count: p.attempt_count,
147            last_error: p.last_error.clone(),
148            embedding_dim: p.embedding_dim,
149            created_at: p.created_at,
150            updated_at: p.updated_at,
151        }
152    }
153}
154
155#[derive(Serialize)]
156struct PendingListOutput {
157    action: &'static str,
158    filter_status: Option<String>,
159    count: usize,
160    entries: Vec<PendingListEntry>,
161    elapsed_ms: u64,
162}
163
164#[derive(Serialize)]
165struct PendingShowOutput {
166    action: &'static str,
167    entry: PendingMemory,
168    elapsed_ms: u64,
169}
170
171#[derive(Serialize)]
172struct PendingCleanupOutput {
173    action: &'static str,
174    dry_run: bool,
175    staged_cleanup_after_secs: u64,
176    candidates: usize,
177    removed: usize,
178    elapsed_ms: u64,
179    yes: bool,
180}
181
182pub fn run(args: PendingArgs) -> Result<(), AppError> {
183    match args.cmd {
184        PendingCmd::List(a) => run_list(a),
185        PendingCmd::Show(a) => run_show(a),
186        PendingCmd::Cleanup(a) => run_cleanup(a),
187    }
188}
189
190fn open_conn(db_override: Option<&str>) -> Result<(AppPaths, rusqlite::Connection), AppError> {
191    // GAP-E2E-010b (v1.0.89): honor `--db <PATH>` for parity with the
192    // rest of the CLI surface. `AppPaths::resolve` accepts the same value
193    // passed by callers of other subcommands, keeping path semantics
194    // consistent across the entire command surface.
195    let paths = AppPaths::resolve(db_override)?;
196    let conn = open_rw(&paths.db)?;
197    Ok((paths, conn))
198}
199
200fn run_list(args: PendingListArgs) -> Result<(), AppError> {
201    let start = std::time::Instant::now();
202    let (_paths, conn) = open_conn(args.db.as_deref())?;
203
204    // If a status filter was provided, query that single status. Otherwise return
205    // all six buckets so the operator can see the full staging landscape.
206    let entries: Vec<PendingMemory> = if let Some(status) = args.status {
207        pending_memories::list_by_status(&conn, status.into(), args.limit)?
208    } else {
209        let mut all = Vec::new();
210        for status in [
211            PendingStatus::EmbeddingInProgress,
212            PendingStatus::EmbeddingDone,
213            PendingStatus::Validated,
214            PendingStatus::Abandoned,
215            PendingStatus::Failed,
216        ] {
217            let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
218            all.append(&mut bucket);
219        }
220        all.truncate(args.limit);
221        all
222    };
223
224    let count = entries.len();
225    let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
226    let output = PendingListOutput {
227        action: "pending_list",
228        filter_status: args.status.map(|s| {
229            match s {
230                PendingStatusArg::Validated => "validated",
231                PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
232                PendingStatusArg::EmbeddingDone => "embedding_done",
233                PendingStatusArg::Committed => "committed",
234                PendingStatusArg::Abandoned => "abandoned",
235                PendingStatusArg::Failed => "failed",
236            }
237            .to_string()
238        }),
239        count,
240        entries: entries_out,
241        elapsed_ms: start.elapsed().as_millis() as u64,
242    };
243    emit_json_compact(&output)
244}
245
246fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
247    let start = std::time::Instant::now();
248    let (_paths, conn) = open_conn(args.db.as_deref())?;
249    let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
250        AppError::NotFound(format!(
251            "pending_id {} not found in pending_memories",
252            args.pending_id
253        ))
254    })?;
255    let output = PendingShowOutput {
256        action: "pending_show",
257        entry,
258        elapsed_ms: start.elapsed().as_millis() as u64,
259    };
260    emit_json_compact(&output)
261}
262
263fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
264    let start = std::time::Instant::now();
265    let (_paths, conn) = open_conn(args.db.as_deref())?;
266
267    // Count candidates first so dry-run is non-mutating.
268    let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
269        .into_iter()
270        .filter(|p| {
271            let now = chrono::Utc::now().timestamp();
272            now - p.updated_at >= args.staged_cleanup_after as i64
273        })
274        .count();
275
276    let removed = if args.dry_run {
277        0
278    } else {
279        pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
280    };
281
282    let output = PendingCleanupOutput {
283        action: if args.dry_run {
284            "pending_cleanup_dry_run"
285        } else {
286            "pending_cleanup"
287        },
288        dry_run: args.dry_run,
289        staged_cleanup_after_secs: args.staged_cleanup_after,
290        candidates,
291        removed,
292        elapsed_ms: start.elapsed().as_millis() as u64,
293        yes: args.yes,
294    };
295    emit_json_compact(&output)
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn status_arg_round_trip_all_variants() {
304        for arg in [
305            PendingStatusArg::Validated,
306            PendingStatusArg::EmbeddingInProgress,
307            PendingStatusArg::EmbeddingDone,
308            PendingStatusArg::Committed,
309            PendingStatusArg::Abandoned,
310            PendingStatusArg::Failed,
311        ] {
312            let status: PendingStatus = arg.into();
313            assert_eq!(status.as_str(), arg_to_str(arg));
314        }
315    }
316
317    fn arg_to_str(arg: PendingStatusArg) -> &'static str {
318        match arg {
319            PendingStatusArg::Validated => "validated",
320            PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
321            PendingStatusArg::EmbeddingDone => "embedding_done",
322            PendingStatusArg::Committed => "committed",
323            PendingStatusArg::Abandoned => "abandoned",
324            PendingStatusArg::Failed => "failed",
325        }
326    }
327}