Skip to main content

sqlite_graphrag/commands/enrich/
queue.rs

1//! Enrichment queue — SQLite-backed scan/retry/dead-letter DB.
2
3use super::*;
4
5// ---------------------------------------------------------------------------
6// Queue DB
7// ---------------------------------------------------------------------------
8
9/// Opens or creates the enrichment queue database.
10///
11/// The queue schema mirrors `ingest_claude` for resume/retry parity.
12/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
13///
14/// # DRY note
15///
16/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
17/// Both should be unified in a shared `llm_runner.rs` module by the
18/// Integration stream.
19pub(super) fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
20    let conn = Connection::open(path)?;
21    conn.pragma_update(None, "journal_mode", "wal")?;
22    conn.execute_batch(
23        "CREATE TABLE IF NOT EXISTS queue (
24            id          INTEGER PRIMARY KEY AUTOINCREMENT,
25            item_key    TEXT NOT NULL UNIQUE,
26            item_type   TEXT NOT NULL DEFAULT 'memory',
27            status      TEXT NOT NULL DEFAULT 'pending',
28            memory_id   INTEGER,
29            entity_id   INTEGER,
30            entities    INTEGER DEFAULT 0,
31            rels        INTEGER DEFAULT 0,
32            error       TEXT,
33            cost_usd    REAL DEFAULT 0.0,
34            attempt     INTEGER DEFAULT 0,
35            elapsed_ms  INTEGER,
36            created_at  TEXT DEFAULT (datetime('now')),
37            done_at     TEXT
38        );
39        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
40    )?;
41    // GAP-ENRICH-BACKLOG-CONVERGE (v1.0.96): dead-letter columns. The legacy
42    // `.enrich-queue.sqlite` predates these columns and `CREATE TABLE IF NOT
43    // EXISTS` never alters an existing table, so add them idempotently here.
44    let mut has_error_class = false;
45    let mut has_next_retry_at = false;
46    // GAP-SG-12/42: the `operation` column scopes queue rows to the enrich
47    // operation that enqueued them, so `--status` can segment counts per
48    // operation instead of conflating a shared `item_key` space. Migrated
49    // idempotently here for the same reason as the v1.0.96 columns.
50    let mut has_operation = false;
51    {
52        let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
53        let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
54        for name in names {
55            match name?.as_str() {
56                "error_class" => has_error_class = true,
57                "next_retry_at" => has_next_retry_at = true,
58                "operation" => has_operation = true,
59                _ => {}
60            }
61        }
62    }
63    if !has_error_class {
64        conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
65    }
66    if !has_next_retry_at {
67        conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
68    }
69    if !has_operation {
70        conn.execute_batch("ALTER TABLE queue ADD COLUMN operation TEXT")?;
71    }
72    conn.execute_batch(
73        "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at);
74         CREATE INDEX IF NOT EXISTS idx_enrich_queue_operation ON queue(operation, status);
75         CREATE INDEX IF NOT EXISTS idx_enrich_queue_memory ON queue(memory_id)",
76    )?;
77    Ok(conn)
78}
79
80/// GAP-SG-12: enqueue one scan candidate, linking it to its `memory_id` and
81/// tagging it with the originating `operation`. For memory-keyed operations the
82/// id is resolved from `main_conn` so the cascade cleanup (GAP-SG-13) can target
83/// the queue row by `memory_id` even before the item is processed. Entity/id
84/// keyed operations leave `memory_id` NULL (the `item_key` carries the link).
85/// `INSERT OR IGNORE` preserves the v1.0.96 invariant that a dead-letter row is
86/// never resurrected by re-enqueue (item_key is UNIQUE).
87pub(super) fn enqueue_candidate(
88    queue_conn: &Connection,
89    main_conn: &Connection,
90    namespace: &str,
91    key: &str,
92    item_type: &str,
93    operation: &str,
94) {
95    let memory_id: Option<i64> = if item_type == "memory" {
96        main_conn
97            .query_row(
98                "SELECT id FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
99                rusqlite::params![namespace, key],
100                |r| r.get(0),
101            )
102            .ok()
103    } else {
104        None
105    };
106    if let Err(e) = queue_conn.execute(
107        "INSERT OR IGNORE INTO queue (item_key, item_type, status, operation, memory_id) \
108         VALUES (?1, ?2, 'pending', ?3, ?4)",
109        rusqlite::params![key, item_type, operation, memory_id],
110    ) {
111        tracing::warn!(target: "enrich", error = %e, "queue insert failed");
112    }
113}
114
115/// GAP-SG-69: item_keys vetoed `status='skipped'` for an operation. The
116/// body-enrich scan selects candidates purely by `LENGTH(body) <
117/// min_output_chars`, so a short body whose rewrite the preservation guard keeps
118/// rejecting would be re-scanned every pass and `--until-empty` would never
119/// converge. Callers exclude these keys so the scan returns only actionable
120/// items; `cleanup_queue_entry` clears the veto when the body actually changes,
121/// restoring the memory as a candidate.
122pub(super) fn skipped_item_keys(
123    conn: &Connection,
124    operation: &str,
125) -> Result<std::collections::HashSet<String>, AppError> {
126    let mut stmt = conn.prepare(
127        "SELECT item_key FROM queue WHERE status='skipped' AND (operation = ?1 OR operation IS NULL)",
128    )?;
129    let keys = stmt
130        .query_map(rusqlite::params![operation], |r| r.get::<_, String>(0))?
131        .collect::<Result<std::collections::HashSet<String>, _>>()?;
132    Ok(keys)
133}
134
135/// Queue `item_type` for an operation: entity-keyed operations use `"entity"`,
136/// every other (memory/id-keyed) operation uses `"memory"`.
137pub(super) fn item_type_for(operation: &EnrichOperation) -> &'static str {
138    match operation {
139        EnrichOperation::EntityDescriptions => "entity",
140        _ => "memory",
141    }
142}
143
144/// GAP-SG-13: remove a memory's enrich-queue entry when the memory is deleted or
145/// force-merged, so the dead-letter / pending sidecar never references a row
146/// that no longer exists. Best-effort and a no-op when the queue file is absent
147/// (the common case after a clean run, which removes it). Targets BOTH
148/// `memory_id` (populated at enqueue for memory ops, GAP-SG-12) and `item_key`
149/// (the memory name) so pending rows enqueued before id resolution are also
150/// cleared. Errors are logged, never propagated — cleanup must not fail the
151/// caller's delete/upsert.
152pub fn cleanup_queue_entry(db_path: &std::path::Path, memory_id: i64, name: &str) {
153    let queue_path = crate::paths::sidecar_path(db_path, ".enrich-queue.sqlite");
154    if !queue_path.exists() {
155        return;
156    }
157    match open_queue_db(&queue_path) {
158        Ok(conn) => {
159            if let Err(e) = conn.execute(
160                "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
161                rusqlite::params![memory_id, name],
162            ) {
163                tracing::warn!(target: "enrich", error = %e, memory_id, "enrich-queue cleanup failed");
164            }
165        }
166        Err(e) => {
167            tracing::warn!(target: "enrich", error = %e, "enrich-queue cleanup skipped (open failed)");
168        }
169    }
170}
171
172/// GAP-SG-66: prune ORPHAN dead-letter rows — `status='dead'` memory rows whose
173/// `item_key` (the memory name) no longer exists in the main DB for `namespace`.
174///
175/// These are terminal "not found" failures (the memory was renamed/purged after
176/// being enqueued): re-processing them just re-fails with the same not-found
177/// error, so `--requeue-dead` can never recover them and they inflate
178/// `queue_dead` forever. Read-only on the main DB; deletes only the
179/// confirmed-orphan rows from the queue sidecar. Entity-keyed dead rows
180/// (`item_type='entity'`) are left untouched — their key is an entity name, not
181/// a memory name. Returns the number of rows pruned.
182pub(super) fn prune_dead_orphans(
183    queue_conn: &Connection,
184    main_conn: &Connection,
185    operation: &str,
186    namespace: &str,
187) -> Result<i64, AppError> {
188    let dead: Vec<(i64, String)> = {
189        let mut stmt = queue_conn.prepare(
190            "SELECT id, item_key FROM queue \
191             WHERE status='dead' AND item_type='memory' \
192             AND (operation = ?1 OR operation IS NULL) ORDER BY id",
193        )?;
194        let rows = stmt
195            .query_map(rusqlite::params![operation], |r| Ok((r.get(0)?, r.get(1)?)))?
196            .collect::<Result<Vec<_>, _>>()?;
197        rows
198    };
199    let mut pruned = 0_i64;
200    for (id, name) in dead {
201        let exists = main_conn
202            .query_row(
203                "SELECT 1 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
204                rusqlite::params![namespace, name],
205                |_| Ok(()),
206            )
207            .is_ok();
208        if !exists {
209            queue_conn.execute("DELETE FROM queue WHERE id=?1", rusqlite::params![id])?;
210            pruned += 1;
211        }
212    }
213    if pruned > 0 {
214        let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
215    }
216    Ok(pruned)
217}
218
219// ---------------------------------------------------------------------------
220// GAP-ENRICH-BACKLOG-CONVERGE — dead-letter classification + queue failure sink
221// ---------------------------------------------------------------------------
222
223/// Read-only `enrich --status` report (no LLM, no singleton).
224///
225/// GAP-SG-42: all queue counts are scoped to the current `--operation` (rows
226/// migrated before the `operation` column, which are NULL, are still counted so
227/// a legacy queue is not silently reported as empty).
228#[derive(Debug, Serialize, schemars::JsonSchema)]
229pub struct EnrichStatus {
230    pub(super) status_report: bool,
231    pub(super) operation: String,
232    pub(super) namespace: String,
233    pub(super) unbound_backlog: usize,
234    pub(super) queue_pending: i64,
235    pub(super) queue_processing: i64,
236    pub(super) queue_done: i64,
237    pub(super) queue_failed: i64,
238    pub(super) queue_skipped: i64,
239    pub(super) queue_dead: i64,
240    pub(super) eligible_now: i64,
241    pub(super) waiting: i64,
242    /// GAP-SG-15/46: coarse backlog state, disambiguating an empty queue from a
243    /// not-yet-scanned backlog and from a cooldown wait.
244    /// `draining` (eligible items now) | `cooldown` (all pending items waiting on
245    /// `next_retry_at`) | `pending-scan` (candidates exist but the queue is not
246    /// populated — run enrich to scan) | `empty` (nothing left to do).
247    pub(super) state: &'static str,
248    /// GAP-SG-16: per-item `next_retry_at` for every pending row currently in
249    /// backoff, so an operator can see exactly when each will become eligible.
250    pub(super) waiting_items: Vec<WaitingItem>,
251}
252
253/// GAP-SG-16: one pending queue row waiting on its backoff cooldown.
254#[derive(Debug, Serialize, schemars::JsonSchema)]
255pub struct WaitingItem {
256    pub(super) item_key: String,
257    pub(super) attempt: i64,
258    pub(super) next_retry_at: Option<String>,
259    pub(super) error_class: Option<String>,
260}
261
262/// GAP-SG-23: one dead-letter row reported by `--list-dead`.
263#[derive(Debug, Serialize, schemars::JsonSchema)]
264pub struct DeadItem {
265    pub(super) dead_item: bool,
266    pub(super) item_key: String,
267    pub(super) item_type: String,
268    pub(super) attempt: i64,
269    pub(super) error_class: Option<String>,
270    pub(super) error: Option<String>,
271}
272
273/// GAP-SG-23/11: summary footer for `--list-dead` and `--requeue-dead`.
274#[derive(Debug, Serialize, schemars::JsonSchema)]
275pub struct DeadSummary {
276    pub(super) summary: bool,
277    pub(super) operation: String,
278    pub(super) namespace: String,
279    /// `list-dead` | `requeue-dead` | `prune-dead-orphans`
280    pub(super) action: &'static str,
281    pub(super) dead_total: i64,
282    pub(super) requeued: i64,
283    /// GAP-SG-66: `prune-dead-orphans` — dead rows removed because their
284    /// referenced memory no longer exists in the main DB for the namespace.
285    /// Zero for `list-dead` / `requeue-dead`.
286    pub(super) pruned: i64,
287}
288
289/// Classifies an enrich item failure into a retry/dead-letter outcome.
290///
291/// Transient errors (rate-limit, timeout, db-busy, or a message that smells
292/// like a recoverable network/5xx hiccup) are rescheduled with backoff.
293/// Everything else — validation, parse, invalid body, unknown — is a permanent
294/// `HardFailure` routed to the dead-letter sink so the backlog can converge.
295pub(super) fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
296    use crate::retry::AttemptOutcome;
297    match e {
298        AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
299            AttemptOutcome::Transient
300        }
301        // GAP-SG-09: errors that are genuinely PERMANENT for this item and must
302        // dead-letter immediately (retrying cannot help): a structured provider
303        // rejection (context-length overflow / refusal carried as ProviderError),
304        // or a memory/entity that no longer exists (deleted between scan and
305        // processing).
306        AppError::ProviderError { .. }
307        | AppError::NotFound(_)
308        | AppError::MemoryNotFound { .. }
309        | AppError::MemoryNotFoundById { .. } => AttemptOutcome::HardFailure,
310        _ => {
311            let msg = format!("{e}").to_lowercase();
312            if msg.contains("server error")
313                || msg.contains("timed out")
314                || msg.contains("timeout")
315                || msg.contains("connection")
316                || msg.contains("5xx")
317                || msg.contains("502")
318                || msg.contains("503")
319                || msg.contains("504")
320            {
321                AttemptOutcome::Transient
322            } else if msg.contains("json")
323                || msg.contains("no structured content")
324                || msg.contains("non-object")
325                || msg.contains("missing '")
326            {
327                // GAP-SG-09: malformed / non-JSON / shape-invalid LLM output is a
328                // model HICCUP, not a permanent fault. deepseek-v4-flash:nitro
329                // emits the occasional non-JSON or shape-wrong generation; with
330                // strict-parse + repair (GAP-SG-10) most are recovered, and the
331                // rest must be RESCHEDULED with backoff (bounded by
332                // --max-attempts) instead of dead-lettering on the first try.
333                AttemptOutcome::Transient
334            } else {
335                AttemptOutcome::HardFailure
336            }
337        }
338    }
339}
340
341/// Applies a failure outcome to a single queue row. Shared by the parallel
342/// worker and the serial loop (DRY). A `HardFailure`, or a transient failure
343/// whose attempt count reached `max_attempts`, lands in the dead-letter status
344/// (`status='dead'`) so it is never re-selected. A transient failure below the
345/// cap is rescheduled to `pending` with an exponential-backoff `next_retry_at`.
346/// Returns the [`crate::retry::AttemptOutcome`] so the caller can feed the
347/// existing circuit breaker.
348pub(super) fn record_item_failure(
349    queue_conn: &rusqlite::Connection,
350    queue_id: i64,
351    attempt: i64,
352    max_attempts: u32,
353    err: &AppError,
354) -> crate::retry::AttemptOutcome {
355    use crate::retry::AttemptOutcome;
356    let outcome = classify_enrich_outcome(err);
357    let err_str = format!("{err}");
358    let error_class = match outcome {
359        AttemptOutcome::Transient => "transient",
360        AttemptOutcome::HardFailure => "permanent",
361        AttemptOutcome::Success => "success",
362    };
363
364    let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
365    if terminal {
366        let _ = queue_conn.execute(
367            "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now') WHERE id=?3",
368            rusqlite::params![err_str, error_class, queue_id],
369        );
370    } else {
371        let delay = crate::retry::compute_delay(
372            &crate::retry::RetryConfig::llm_rate_limit(),
373            attempt.max(0) as u32,
374        );
375        let secs = delay.as_secs().max(1);
376        let modifier = format!("+{secs} seconds");
377        let _ = queue_conn.execute(
378            "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3) WHERE id=?4",
379            rusqlite::params![err_str, error_class, modifier, queue_id],
380        );
381    }
382    outcome
383}
384
385// ---------------------------------------------------------------------------
386// Tests
387// ---------------------------------------------------------------------------
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    fn open_test_db() -> Connection {
394        let conn = Connection::open_in_memory().expect("in-memory db");
395        conn.execute_batch(
396            "CREATE TABLE memories (
397                id          INTEGER PRIMARY KEY AUTOINCREMENT,
398                namespace   TEXT NOT NULL DEFAULT 'global',
399                name        TEXT NOT NULL,
400                type        TEXT NOT NULL DEFAULT 'note',
401                description TEXT NOT NULL DEFAULT '',
402                body        TEXT NOT NULL DEFAULT '',
403                body_hash   TEXT NOT NULL DEFAULT '',
404                session_id  TEXT,
405                source      TEXT NOT NULL DEFAULT 'agent',
406                metadata    TEXT NOT NULL DEFAULT '{}',
407                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
408                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
409                deleted_at  INTEGER,
410                UNIQUE(namespace, name)
411            );",
412        )
413        .expect("schema creation must succeed");
414        conn
415    }
416
417    fn open_temp_queue() -> (Connection, String) {
418        let path = format!(
419            "/tmp/test-enrich-dl-{}-{}.sqlite",
420            std::process::id(),
421            fastrand::u64(..)
422        );
423        let conn = open_queue_db(&path).expect("queue db must open");
424        (conn, path)
425    }
426
427    fn insert_pending(conn: &Connection, key: &str) -> i64 {
428        conn.execute(
429            "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
430            rusqlite::params![key],
431        )
432        .unwrap();
433        conn.last_insert_rowid()
434    }
435
436    #[test]
437    fn queue_db_schema_creates_correctly() {
438        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
439        let conn = open_queue_db(&tmp_path).expect("queue db must open");
440        let count: i64 = conn
441            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
442            .unwrap();
443        assert_eq!(count, 0);
444        let _ = std::fs::remove_file(&tmp_path);
445    }
446
447    #[test]
448    fn classify_rate_limit_is_transient() {
449        let e = AppError::RateLimited {
450            detail: "429".into(),
451        };
452        assert_eq!(
453            classify_enrich_outcome(&e),
454            crate::retry::AttemptOutcome::Transient
455        );
456    }
457
458    #[test]
459    fn classify_timeout_and_dbbusy_are_transient() {
460        let t = AppError::Timeout {
461            operation: "judge".into(),
462            duration_secs: 30,
463        };
464        let b = AppError::DbBusy("locked".into());
465        assert_eq!(
466            classify_enrich_outcome(&t),
467            crate::retry::AttemptOutcome::Transient
468        );
469        assert_eq!(
470            classify_enrich_outcome(&b),
471            crate::retry::AttemptOutcome::Transient
472        );
473    }
474
475    #[test]
476    fn classify_validation_and_parse_are_hard_failure() {
477        let v = AppError::Validation("failed to parse entities array: bad".into());
478        assert_eq!(
479            classify_enrich_outcome(&v),
480            crate::retry::AttemptOutcome::HardFailure
481        );
482    }
483
484    #[test]
485    fn open_queue_db_alter_is_idempotent() {
486        let path = format!(
487            "/tmp/test-enrich-idem-{}-{}.sqlite",
488            std::process::id(),
489            fastrand::u64(..)
490        );
491        let _ = open_queue_db(&path).expect("first open");
492        let conn = open_queue_db(&path).expect("second open is idempotent");
493        let cols: Vec<String> = {
494            let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
495            stmt.query_map([], |r| r.get::<_, String>(1))
496                .unwrap()
497                .collect::<Result<Vec<_>, _>>()
498                .unwrap()
499        };
500        assert!(cols.iter().any(|c| c == "error_class"));
501        assert!(cols.iter().any(|c| c == "next_retry_at"));
502        let _ = std::fs::remove_file(&path);
503    }
504
505    #[test]
506    fn record_item_failure_hard_marks_dead() {
507        let (conn, path) = open_temp_queue();
508        let id = insert_pending(&conn, "mem-hard");
509        let outcome = record_item_failure(
510            &conn,
511            id,
512            1,
513            5,
514            &AppError::Validation("invalid body".into()),
515        );
516        assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
517        let status: String = conn
518            .query_row(
519                "SELECT status FROM queue WHERE id=?1",
520                rusqlite::params![id],
521                |r| r.get(0),
522            )
523            .unwrap();
524        assert_eq!(status, "dead");
525        let _ = std::fs::remove_file(&path);
526    }
527
528    #[test]
529    fn record_item_failure_transient_reschedules_pending() {
530        let (conn, path) = open_temp_queue();
531        let id = insert_pending(&conn, "mem-transient");
532        let outcome = record_item_failure(
533            &conn,
534            id,
535            1,
536            5,
537            &AppError::RateLimited {
538                detail: "429".into(),
539            },
540        );
541        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
542        let (status, future): (String, i64) = conn
543            .query_row(
544                "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
545                rusqlite::params![id],
546                |r| Ok((r.get(0)?, r.get(1)?)),
547            )
548            .unwrap();
549        assert_eq!(status, "pending");
550        assert_eq!(future, 1, "next_retry_at must be in the future");
551        let _ = std::fs::remove_file(&path);
552    }
553
554    #[test]
555    fn record_item_failure_transient_at_cap_marks_dead() {
556        let (conn, path) = open_temp_queue();
557        let id = insert_pending(&conn, "mem-cap");
558        let outcome = record_item_failure(
559            &conn,
560            id,
561            5,
562            5,
563            &AppError::RateLimited {
564                detail: "429".into(),
565            },
566        );
567        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
568        let status: String = conn
569            .query_row(
570                "SELECT status FROM queue WHERE id=?1",
571                rusqlite::params![id],
572                |r| r.get(0),
573            )
574            .unwrap();
575        assert_eq!(status, "dead");
576        let _ = std::fs::remove_file(&path);
577    }
578
579    #[test]
580    fn dequeue_skips_future_retry_and_dead() {
581        let (conn, path) = open_temp_queue();
582        let eligible = insert_pending(&conn, "mem-eligible");
583        let waiting = insert_pending(&conn, "mem-waiting");
584        conn.execute(
585            "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
586            rusqlite::params![waiting],
587        )
588        .unwrap();
589        let dead = insert_pending(&conn, "mem-dead");
590        conn.execute(
591            "UPDATE queue SET status='dead' WHERE id=?1",
592            rusqlite::params![dead],
593        )
594        .unwrap();
595
596        let claimed: Option<i64> = conn
597            .query_row(
598                "UPDATE queue SET status='processing', attempt=attempt+1 \
599                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
600                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
601                             ORDER BY id LIMIT 1) \
602                 RETURNING id",
603                [],
604                |r| r.get(0),
605            )
606            .ok();
607        assert_eq!(claimed, Some(eligible));
608
609        let second: Option<i64> = conn
610            .query_row(
611                "UPDATE queue SET status='processing', attempt=attempt+1 \
612                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
613                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
614                             ORDER BY id LIMIT 1) \
615                 RETURNING id",
616                [],
617                |r| r.get(0),
618            )
619            .ok();
620        assert_eq!(second, None);
621        let _ = std::fs::remove_file(&path);
622    }
623
624    #[test]
625    fn classify_non_json_and_shape_errors_are_transient() {
626        for msg in [
627            "model 'x' returned non-object JSON after repair (got string)",
628            "model 'x' returned content that could not be parsed even after JSON repair",
629            "model 'x' returned no structured content",
630            "LLM result missing 'description' field",
631            "LLM result missing 'enriched_body' field",
632        ] {
633            assert_eq!(
634                classify_enrich_outcome(&AppError::Validation(msg.into())),
635                crate::retry::AttemptOutcome::Transient,
636                "expected transient for: {msg}"
637            );
638        }
639    }
640
641    #[test]
642    fn classify_provider_error_and_not_found_are_hard() {
643        assert_eq!(
644            classify_enrich_outcome(&AppError::ProviderError {
645                code: "400".into(),
646                message: "context length exceeded".into(),
647            }),
648            crate::retry::AttemptOutcome::HardFailure
649        );
650        assert_eq!(
651            classify_enrich_outcome(&AppError::NotFound("memory 'gone' not found".into())),
652            crate::retry::AttemptOutcome::HardFailure
653        );
654    }
655
656    #[test]
657    fn open_queue_db_migrates_operation_column() {
658        let (conn, path) = open_temp_queue();
659        drop(conn);
660        let conn = open_queue_db(&path).expect("second open is idempotent");
661        let cols: Vec<String> = {
662            let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
663            stmt.query_map([], |r| r.get::<_, String>(1))
664                .unwrap()
665                .collect::<Result<Vec<_>, _>>()
666                .unwrap()
667        };
668        assert!(cols.iter().any(|c| c == "operation"));
669        assert!(cols.iter().any(|c| c == "memory_id"));
670        let _ = std::fs::remove_file(&path);
671    }
672
673    #[test]
674    fn enqueue_candidate_tags_operation_and_memory_id() {
675        let main = open_test_db();
676        main.execute(
677            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-x', 'body')",
678            [],
679        )
680        .unwrap();
681        let mem_id: i64 = main
682            .query_row("SELECT id FROM memories WHERE name='mem-x'", [], |r| {
683                r.get(0)
684            })
685            .unwrap();
686        let (queue, path) = open_temp_queue();
687        enqueue_candidate(&queue, &main, "global", "mem-x", "memory", "MemoryBindings");
688        let (op, mid): (String, i64) = queue
689            .query_row(
690                "SELECT operation, memory_id FROM queue WHERE item_key='mem-x'",
691                [],
692                |r| Ok((r.get(0)?, r.get(1)?)),
693            )
694            .unwrap();
695        assert_eq!(op, "MemoryBindings");
696        assert_eq!(mid, mem_id);
697        let _ = std::fs::remove_file(&path);
698    }
699
700    #[test]
701    fn requeue_dead_resurrects_dead_rows() {
702        let (conn, path) = open_temp_queue();
703        conn.execute(
704            "INSERT INTO queue (item_key, item_type, status, operation, attempt, error, error_class, next_retry_at) \
705             VALUES ('mem-dead', 'memory', 'dead', 'MemoryBindings', 8, 'boom', 'permanent', datetime('now'))",
706            [],
707        )
708        .unwrap();
709        let n = conn
710            .execute(
711                "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
712                 error=NULL, error_class=NULL \
713                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
714                rusqlite::params!["MemoryBindings"],
715            )
716            .unwrap();
717        assert_eq!(n, 1);
718        let (status, attempt, nra): (String, i64, Option<String>) = conn
719            .query_row(
720                "SELECT status, attempt, next_retry_at FROM queue WHERE item_key='mem-dead'",
721                [],
722                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
723            )
724            .unwrap();
725        assert_eq!(status, "pending");
726        assert_eq!(attempt, 0);
727        assert!(nra.is_none());
728        let _ = std::fs::remove_file(&path);
729    }
730
731    #[test]
732    fn skipped_item_keys_excludes_only_skipped_for_operation() {
733        // GAP-SG-69: the body-enrich scan must drop memories already vetoed
734        // `status='skipped'` so `--until-empty` converges instead of re-scanning a
735        // non-expandable short body forever (the detached worker reported a
736        // stuck backlog for 30+ min).
737        let (conn, path) = open_temp_queue();
738        conn.execute(
739            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-vetoed', 'memory', 'skipped', 'BodyEnrich')",
740            [],
741        )
742        .unwrap();
743        conn.execute(
744            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-pending', 'memory', 'pending', 'BodyEnrich')",
745            [],
746        )
747        .unwrap();
748        conn.execute(
749            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-other-op', 'memory', 'skipped', 'MemoryBindings')",
750            [],
751        )
752        .unwrap();
753        let keys = skipped_item_keys(&conn, "BodyEnrich").unwrap();
754        assert!(
755            keys.contains("mem-vetoed"),
756            "vetoed BodyEnrich item must be excluded from scan"
757        );
758        assert!(
759            !keys.contains("mem-pending"),
760            "pending item is still actionable"
761        );
762        assert!(
763            !keys.contains("mem-other-op"),
764            "skipped item from another operation must not leak"
765        );
766        assert_eq!(keys.len(), 1);
767        let _ = std::fs::remove_file(&path);
768    }
769
770    #[test]
771    fn cascade_cleanup_delete_targets_memory_id_and_name() {
772        let (conn, path) = open_temp_queue();
773        conn.execute(
774            "INSERT INTO queue (item_key, item_type, status, memory_id) VALUES ('by-id', 'memory', 'done', 42)",
775            [],
776        )
777        .unwrap();
778        conn.execute(
779            "INSERT INTO queue (item_key, item_type, status) VALUES ('by-name', 'memory', 'pending')",
780            [],
781        )
782        .unwrap();
783        let removed = conn
784            .execute(
785                "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
786                rusqlite::params![42_i64, "by-name"],
787            )
788            .unwrap();
789        assert_eq!(removed, 2);
790        let remaining: i64 = conn
791            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
792            .unwrap();
793        assert_eq!(remaining, 0);
794        let _ = std::fs::remove_file(&path);
795    }
796
797    #[test]
798    fn item_type_for_maps_entity_and_memory() {
799        assert_eq!(
800            item_type_for(&EnrichOperation::EntityDescriptions),
801            "entity"
802        );
803        assert_eq!(item_type_for(&EnrichOperation::MemoryBindings), "memory");
804        assert_eq!(item_type_for(&EnrichOperation::AugmentBindings), "memory");
805        assert_eq!(item_type_for(&EnrichOperation::BodyExtract), "memory");
806    }
807
808    #[test]
809    fn prune_dead_orphans_removes_only_orphan_memory_rows() {
810        let main = open_test_db();
811        // One live memory whose dead row must be KEPT (it still exists).
812        main.execute(
813            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'alive', 'b')",
814            [],
815        )
816        .unwrap();
817        let (queue, path) = open_temp_queue();
818        // Orphan dead memory row (no matching memory) -> pruned.
819        queue
820            .execute(
821                "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
822                 VALUES ('gone', 'memory', 'dead', 'MemoryBindings', 'permanent')",
823                [],
824            )
825            .unwrap();
826        // Live dead memory row (memory exists) -> kept.
827        queue
828            .execute(
829                "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
830                 VALUES ('alive', 'memory', 'dead', 'MemoryBindings', 'permanent')",
831                [],
832            )
833            .unwrap();
834        // Entity dead row -> never touched (key is not a memory name).
835        queue
836            .execute(
837                "INSERT INTO queue (item_key, item_type, status, operation) \
838                 VALUES ('some-entity', 'entity', 'dead', 'EntityDescriptions')",
839                [],
840            )
841            .unwrap();
842
843        let pruned = prune_dead_orphans(&queue, &main, "MemoryBindings", "global").unwrap();
844        assert_eq!(pruned, 1, "only the orphan memory row is pruned");
845
846        let remaining: Vec<String> = {
847            let mut stmt = queue
848                .prepare("SELECT item_key FROM queue ORDER BY item_key")
849                .unwrap();
850            stmt.query_map([], |r| r.get::<_, String>(0))
851                .unwrap()
852                .collect::<Result<Vec<_>, _>>()
853                .unwrap()
854        };
855        assert_eq!(remaining, vec!["alive", "some-entity"]);
856        let _ = std::fs::remove_file(&path);
857    }
858}