Skip to main content

sqlite_graphrag/commands/
pending.rs

1//! GAP-001 (v1.0.82): `pending` subcommand — inspect and manage the
2//! three-stage `remember` checkpoint queue persisted in `pending_memories`.
3//!
4//! ## Subcommands
5//! - `pending list [--status <STATUS>]` — show entries by status
6//! - `pending show <pending_id>` — show one entry in full
7//! - `pending cleanup --staged-cleanup-after <SECONDS>` — remove old abandoned
8//!
9//! The `pending` table is the durable footprint of the v1.0.82 staging pipeline
10//! (Stage A → B → C). When a host crashes between Stage B and Stage C the entry
11//! stays in `embedding_done` (or `embedding_in_progress`) and can be inspected
12//! or cleaned via this subcommand.
13
14use clap::{Args, Subcommand};
15use serde::Serialize;
16
17use crate::errors::AppError;
18use crate::output::emit_json_compact;
19use crate::paths::AppPaths;
20use crate::storage::connection::open_rw;
21use crate::storage::pending_memories::{self, PendingMemory, PendingStatus};
22
23#[derive(Debug, Args)]
24#[command(after_long_help = "EXAMPLES:\n  \
25    # List all entries currently waiting for embedding (Stage A done, Stage B pending)\n  \
26    sqlite-graphrag pending list --status validated --json\n\n  \
27    # Show the full record of pending_id 42\n  \
28    sqlite-graphrag pending show 42 --json\n\n  \
29    # Clean up entries abandoned for >24h (86400 seconds)\n  \
30    sqlite-graphrag pending cleanup --staged-cleanup-after 86400 --yes")]
31pub struct PendingArgs {
32    #[command(subcommand)]
33    pub cmd: PendingCmd,
34}
35
36#[derive(Debug, Subcommand)]
37pub enum PendingCmd {
38    /// List entries by status (defaults to all non-committed).
39    List(PendingListArgs),
40    /// Show one entry in full (includes body, entities_json, embedding_dim).
41    Show(PendingShowArgs),
42    /// Remove entries older than `--staged-cleanup-after` seconds.
43    Cleanup(PendingCleanupArgs),
44}
45
46#[derive(Debug, Args)]
47pub struct PendingListArgs {
48    /// Filter by status: validated | embedding_in_progress | embedding_done |
49    /// committed | abandoned | failed. Default: all.
50    #[arg(long, value_enum)]
51    pub status: Option<PendingStatusArg>,
52    /// Maximum number of entries to return. Default: 100.
53    #[arg(long, default_value_t = 100)]
54    pub limit: usize,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
58#[value(rename_all = "snake_case")]
59pub enum PendingStatusArg {
60    Validated,
61    EmbeddingInProgress,
62    EmbeddingDone,
63    Committed,
64    Abandoned,
65    Failed,
66}
67
68impl From<PendingStatusArg> for PendingStatus {
69    fn from(value: PendingStatusArg) -> Self {
70        match value {
71            PendingStatusArg::Validated => Self::Validated,
72            PendingStatusArg::EmbeddingInProgress => Self::EmbeddingInProgress,
73            PendingStatusArg::EmbeddingDone => Self::EmbeddingDone,
74            PendingStatusArg::Committed => Self::Committed,
75            PendingStatusArg::Abandoned => Self::Abandoned,
76            PendingStatusArg::Failed => Self::Failed,
77        }
78    }
79}
80
81#[derive(Debug, Args)]
82pub struct PendingShowArgs {
83    /// Pending id returned by `remember --stage-only`.
84    pub pending_id: i64,
85}
86
87#[derive(Debug, Args)]
88pub struct PendingCleanupArgs {
89    /// Age in seconds after which an entry is eligible for cleanup.
90    #[arg(long, default_value_t = 86400)]
91    pub staged_cleanup_after: u64,
92    /// Skip the interactive confirmation prompt.
93    #[arg(long)]
94    pub yes: bool,
95    /// Dry-run: list what would be removed without touching the database.
96    #[arg(long)]
97    pub dry_run: bool,
98}
99
100#[derive(Serialize)]
101struct PendingListEntry {
102    pending_id: i64,
103    name: String,
104    namespace: String,
105    memory_type: String,
106    status: String,
107    attempt_count: i32,
108    last_error: Option<String>,
109    embedding_dim: Option<i32>,
110    created_at: i64,
111    updated_at: i64,
112}
113
114impl From<&PendingMemory> for PendingListEntry {
115    fn from(p: &PendingMemory) -> Self {
116        Self {
117            pending_id: p.pending_id,
118            name: p.name.clone(),
119            namespace: p.namespace.clone(),
120            memory_type: p.memory_type.clone(),
121            status: p.status.as_str().to_string(),
122            attempt_count: p.attempt_count,
123            last_error: p.last_error.clone(),
124            embedding_dim: p.embedding_dim,
125            created_at: p.created_at,
126            updated_at: p.updated_at,
127        }
128    }
129}
130
131#[derive(Serialize)]
132struct PendingListOutput {
133    action: &'static str,
134    filter_status: Option<String>,
135    count: usize,
136    entries: Vec<PendingListEntry>,
137    elapsed_ms: u64,
138}
139
140#[derive(Serialize)]
141struct PendingShowOutput {
142    action: &'static str,
143    entry: PendingMemory,
144    elapsed_ms: u64,
145}
146
147#[derive(Serialize)]
148struct PendingCleanupOutput {
149    action: &'static str,
150    dry_run: bool,
151    staged_cleanup_after_secs: u64,
152    candidates: usize,
153    removed: usize,
154    elapsed_ms: u64,
155    yes: bool,
156}
157
158pub fn run(args: PendingArgs) -> Result<(), AppError> {
159    match args.cmd {
160        PendingCmd::List(a) => run_list(a),
161        PendingCmd::Show(a) => run_show(a),
162        PendingCmd::Cleanup(a) => run_cleanup(a),
163    }
164}
165
166fn open_conn() -> Result<(AppPaths, rusqlite::Connection), AppError> {
167    let paths = AppPaths::resolve(None)?;
168    let conn = open_rw(&paths.db)?;
169    Ok((paths, conn))
170}
171
172fn run_list(args: PendingListArgs) -> Result<(), AppError> {
173    let start = std::time::Instant::now();
174    let (_paths, conn) = open_conn()?;
175
176    // If a status filter was provided, query that single status. Otherwise return
177    // all six buckets so the operator can see the full staging landscape.
178    let entries: Vec<PendingMemory> = if let Some(status) = args.status {
179        pending_memories::list_by_status(&conn, status.into(), args.limit)?
180    } else {
181        let mut all = Vec::new();
182        for status in [
183            PendingStatus::EmbeddingInProgress,
184            PendingStatus::EmbeddingDone,
185            PendingStatus::Validated,
186            PendingStatus::Abandoned,
187            PendingStatus::Failed,
188        ] {
189            let mut bucket = pending_memories::list_by_status(&conn, status, args.limit)?;
190            all.append(&mut bucket);
191        }
192        all.truncate(args.limit);
193        all
194    };
195
196    let count = entries.len();
197    let entries_out: Vec<PendingListEntry> = entries.iter().map(PendingListEntry::from).collect();
198    let output = PendingListOutput {
199        action: "pending_list",
200        filter_status: args.status.map(|s| {
201            match s {
202                PendingStatusArg::Validated => "validated",
203                PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
204                PendingStatusArg::EmbeddingDone => "embedding_done",
205                PendingStatusArg::Committed => "committed",
206                PendingStatusArg::Abandoned => "abandoned",
207                PendingStatusArg::Failed => "failed",
208            }
209            .to_string()
210        }),
211        count,
212        entries: entries_out,
213        elapsed_ms: start.elapsed().as_millis() as u64,
214    };
215    emit_json_compact(&output)
216}
217
218fn run_show(args: PendingShowArgs) -> Result<(), AppError> {
219    let start = std::time::Instant::now();
220    let (_paths, conn) = open_conn()?;
221    let entry = pending_memories::find_by_id(&conn, args.pending_id)?.ok_or_else(|| {
222        AppError::NotFound(format!(
223            "pending_id {} not found in pending_memories",
224            args.pending_id
225        ))
226    })?;
227    let output = PendingShowOutput {
228        action: "pending_show",
229        entry,
230        elapsed_ms: start.elapsed().as_millis() as u64,
231    };
232    emit_json_compact(&output)
233}
234
235fn run_cleanup(args: PendingCleanupArgs) -> Result<(), AppError> {
236    let start = std::time::Instant::now();
237    let (_paths, conn) = open_conn()?;
238
239    // Count candidates first so dry-run is non-mutating.
240    let candidates = pending_memories::list_by_status(&conn, PendingStatus::Abandoned, 100_000)?
241        .into_iter()
242        .filter(|p| {
243            let now = chrono::Utc::now().timestamp();
244            now - p.updated_at >= args.staged_cleanup_after as i64
245        })
246        .count();
247
248    let removed = if args.dry_run {
249        0
250    } else {
251        pending_memories::cleanup_older_than(&conn, args.staged_cleanup_after as i64)?
252    };
253
254    let output = PendingCleanupOutput {
255        action: if args.dry_run {
256            "pending_cleanup_dry_run"
257        } else {
258            "pending_cleanup"
259        },
260        dry_run: args.dry_run,
261        staged_cleanup_after_secs: args.staged_cleanup_after,
262        candidates,
263        removed,
264        elapsed_ms: start.elapsed().as_millis() as u64,
265        yes: args.yes,
266    };
267    emit_json_compact(&output)
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn status_arg_round_trip_all_variants() {
276        for arg in [
277            PendingStatusArg::Validated,
278            PendingStatusArg::EmbeddingInProgress,
279            PendingStatusArg::EmbeddingDone,
280            PendingStatusArg::Committed,
281            PendingStatusArg::Abandoned,
282            PendingStatusArg::Failed,
283        ] {
284            let status: PendingStatus = arg.into();
285            assert_eq!(status.as_str(), arg_to_str(arg));
286        }
287    }
288
289    fn arg_to_str(arg: PendingStatusArg) -> &'static str {
290        match arg {
291            PendingStatusArg::Validated => "validated",
292            PendingStatusArg::EmbeddingInProgress => "embedding_in_progress",
293            PendingStatusArg::EmbeddingDone => "embedding_done",
294            PendingStatusArg::Committed => "committed",
295            PendingStatusArg::Abandoned => "abandoned",
296            PendingStatusArg::Failed => "failed",
297        }
298    }
299}