Skip to main content

sqlite_graphrag/commands/enrich/
queue.rs

1//! Enrichment queue — SQLite-backed scan/retry/dead-letter DB.
2
3use super::*;
4
5// ---------------------------------------------------------------------------
6// Queue DB
7// ---------------------------------------------------------------------------
8
9/// Opens or creates the enrichment queue database.
10///
11/// The queue schema mirrors `ingest_claude` for resume/retry parity.
12/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
13///
14/// # DRY note
15///
16/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
17/// Both should be unified in a shared `llm_runner.rs` module by the
18/// Integration stream.
19pub(super) fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
20    let conn = Connection::open(path)?;
21    conn.pragma_update(None, "journal_mode", "wal")?;
22    // GAP-SG-76: without an explicit busy_timeout, a lock contention window
23    // between the dequeue claim and a concurrent worker/main-DB writer
24    // surfaces as SQLITE_BUSY immediately instead of retrying briefly.
25    // Reuses the project-wide canonical value (see rules_rust_sqlite.md —
26    // "DEFINIR busy_timeout em milissegundos explícitos por conexão").
27    conn.pragma_update(None, "busy_timeout", crate::constants::BUSY_TIMEOUT_MILLIS)?;
28    conn.execute_batch(
29        "CREATE TABLE IF NOT EXISTS queue (
30            id          INTEGER PRIMARY KEY AUTOINCREMENT,
31            item_key    TEXT NOT NULL UNIQUE,
32            item_type   TEXT NOT NULL DEFAULT 'memory',
33            status      TEXT NOT NULL DEFAULT 'pending',
34            memory_id   INTEGER,
35            entity_id   INTEGER,
36            entities    INTEGER DEFAULT 0,
37            rels        INTEGER DEFAULT 0,
38            error       TEXT,
39            cost_usd    REAL DEFAULT 0.0,
40            attempt     INTEGER DEFAULT 0,
41            elapsed_ms  INTEGER,
42            created_at  TEXT DEFAULT (datetime('now')),
43            done_at     TEXT
44        );
45        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
46    )?;
47    // GAP-ENRICH-BACKLOG-CONVERGE (v1.0.96): dead-letter columns. The legacy
48    // `.enrich-queue.sqlite` predates these columns and `CREATE TABLE IF NOT
49    // EXISTS` never alters an existing table, so add them idempotently here.
50    let mut has_error_class = false;
51    let mut has_next_retry_at = false;
52    // GAP-SG-12/42: the `operation` column scopes queue rows to the enrich
53    // operation that enqueued them, so `--status` can segment counts per
54    // operation instead of conflating a shared `item_key` space. Migrated
55    // idempotently here for the same reason as the v1.0.96 columns.
56    let mut has_operation = false;
57    // GAP-SG-72: dead-letter diagnostics carried from a typed OpenRouter
58    // `ChatError` (finish_reason + token counts) so `--list-dead` can show
59    // WHY an item died (e.g. truncated by max_tokens) instead of only the
60    // formatted error string. Migrated idempotently for the same reason as
61    // the columns above.
62    let mut has_finish_reason = false;
63    let mut has_input_tokens = false;
64    let mut has_output_tokens = false;
65    {
66        let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
67        let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
68        for name in names {
69            match name?.as_str() {
70                "error_class" => has_error_class = true,
71                "next_retry_at" => has_next_retry_at = true,
72                "operation" => has_operation = true,
73                "finish_reason" => has_finish_reason = true,
74                "input_tokens" => has_input_tokens = true,
75                "output_tokens" => has_output_tokens = true,
76                _ => {}
77            }
78        }
79    }
80    if !has_error_class {
81        conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
82    }
83    if !has_next_retry_at {
84        conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
85    }
86    if !has_operation {
87        conn.execute_batch("ALTER TABLE queue ADD COLUMN operation TEXT")?;
88    }
89    if !has_finish_reason {
90        conn.execute_batch("ALTER TABLE queue ADD COLUMN finish_reason TEXT")?;
91    }
92    if !has_input_tokens {
93        conn.execute_batch("ALTER TABLE queue ADD COLUMN input_tokens INTEGER")?;
94    }
95    if !has_output_tokens {
96        conn.execute_batch("ALTER TABLE queue ADD COLUMN output_tokens INTEGER")?;
97    }
98    conn.execute_batch(
99        "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at);
100         CREATE INDEX IF NOT EXISTS idx_enrich_queue_operation ON queue(operation, status);
101         CREATE INDEX IF NOT EXISTS idx_enrich_queue_memory ON queue(memory_id)",
102    )?;
103    Ok(conn)
104}
105
106/// GAP-SG-12: enqueue one scan candidate, linking it to its `memory_id` and
107/// tagging it with the originating `operation`. For memory-keyed operations the
108/// id is resolved from `main_conn` so the cascade cleanup (GAP-SG-13) can target
109/// the queue row by `memory_id` even before the item is processed. Entity/id
110/// keyed operations leave `memory_id` NULL (the `item_key` carries the link).
111/// `INSERT OR IGNORE` preserves the v1.0.96 invariant that a dead-letter row is
112/// never resurrected by re-enqueue (item_key is UNIQUE).
113pub(super) fn enqueue_candidate(
114    queue_conn: &Connection,
115    main_conn: &Connection,
116    namespace: &str,
117    key: &str,
118    item_type: &str,
119    operation: &str,
120) {
121    let memory_id: Option<i64> = if item_type == "memory" {
122        main_conn
123            .query_row(
124                "SELECT id FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
125                rusqlite::params![namespace, key],
126                |r| r.get(0),
127            )
128            .ok()
129    } else {
130        None
131    };
132    if let Err(e) = queue_conn.execute(
133        "INSERT OR IGNORE INTO queue (item_key, item_type, status, operation, memory_id) \
134         VALUES (?1, ?2, 'pending', ?3, ?4)",
135        rusqlite::params![key, item_type, operation, memory_id],
136    ) {
137        tracing::warn!(target: "enrich", error = %e, "queue insert failed");
138    }
139}
140
141/// GAP-SG-69: item_keys vetoed `status='skipped'` for an operation. The
142/// body-enrich scan selects candidates purely by `LENGTH(body) <
143/// min_output_chars`, so a short body whose rewrite the preservation guard keeps
144/// rejecting would be re-scanned every pass and `--until-empty` would never
145/// converge. Callers exclude these keys so the scan returns only actionable
146/// items; `cleanup_queue_entry` clears the veto when the body actually changes,
147/// restoring the memory as a candidate.
148pub(super) fn skipped_item_keys(
149    conn: &Connection,
150    operation: &str,
151) -> Result<std::collections::HashSet<String>, AppError> {
152    let mut stmt = conn.prepare(
153        "SELECT item_key FROM queue WHERE status='skipped' AND (operation = ?1 OR operation IS NULL)",
154    )?;
155    let keys = stmt
156        .query_map(rusqlite::params![operation], |r| r.get::<_, String>(0))?
157        .collect::<Result<std::collections::HashSet<String>, _>>()?;
158    Ok(keys)
159}
160
161/// Queue `item_type` for an operation: entity-keyed operations use `"entity"`,
162/// every other (memory/id-keyed) operation uses `"memory"`.
163pub(super) fn item_type_for(operation: &EnrichOperation) -> &'static str {
164    match operation {
165        EnrichOperation::EntityDescriptions => "entity",
166        _ => "memory",
167    }
168}
169
170/// GAP-SG-13: remove a memory's enrich-queue entry when the memory is deleted or
171/// force-merged, so the dead-letter / pending sidecar never references a row
172/// that no longer exists. Best-effort and a no-op when the queue file is absent
173/// (the common case after a clean run, which removes it). Targets BOTH
174/// `memory_id` (populated at enqueue for memory ops, GAP-SG-12) and `item_key`
175/// (the memory name) so pending rows enqueued before id resolution are also
176/// cleared. Errors are logged, never propagated — cleanup must not fail the
177/// caller's delete/upsert.
178pub fn cleanup_queue_entry(db_path: &std::path::Path, memory_id: i64, name: &str) {
179    let queue_path = crate::paths::sidecar_path(db_path, ".enrich-queue.sqlite");
180    if !queue_path.exists() {
181        return;
182    }
183    match open_queue_db(&queue_path) {
184        Ok(conn) => {
185            if let Err(e) = conn.execute(
186                "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
187                rusqlite::params![memory_id, name],
188            ) {
189                tracing::warn!(target: "enrich", error = %e, memory_id, "enrich-queue cleanup failed");
190            }
191        }
192        Err(e) => {
193            tracing::warn!(target: "enrich", error = %e, "enrich-queue cleanup skipped (open failed)");
194        }
195    }
196}
197
198/// GAP-SG-66: prune ORPHAN dead-letter rows — `status='dead'` memory rows whose
199/// `item_key` (the memory name) no longer exists in the main DB for `namespace`.
200///
201/// These are terminal "not found" failures (the memory was renamed/purged after
202/// being enqueued): re-processing them just re-fails with the same not-found
203/// error, so `--requeue-dead` can never recover them and they inflate
204/// `queue_dead` forever. Read-only on the main DB; deletes only the
205/// confirmed-orphan rows from the queue sidecar. Entity-keyed dead rows
206/// (`item_type='entity'`) are left untouched — their key is an entity name, not
207/// a memory name. Returns the number of rows pruned.
208pub(super) fn prune_dead_orphans(
209    queue_conn: &Connection,
210    main_conn: &Connection,
211    operation: &str,
212    namespace: &str,
213) -> Result<i64, AppError> {
214    let dead: Vec<(i64, String)> = {
215        let mut stmt = queue_conn.prepare(
216            "SELECT id, item_key FROM queue \
217             WHERE status='dead' AND item_type='memory' \
218             AND (operation = ?1 OR operation IS NULL) ORDER BY id",
219        )?;
220        let rows = stmt
221            .query_map(rusqlite::params![operation], |r| Ok((r.get(0)?, r.get(1)?)))?
222            .collect::<Result<Vec<_>, _>>()?;
223        rows
224    };
225    let mut pruned = 0_i64;
226    for (id, name) in dead {
227        let exists = main_conn
228            .query_row(
229                "SELECT 1 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
230                rusqlite::params![namespace, name],
231                |_| Ok(()),
232            )
233            .is_ok();
234        if !exists {
235            queue_conn.execute("DELETE FROM queue WHERE id=?1", rusqlite::params![id])?;
236            pruned += 1;
237        }
238    }
239    if pruned > 0 {
240        let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
241    }
242    Ok(pruned)
243}
244
245// ---------------------------------------------------------------------------
246// GAP-ENRICH-BACKLOG-CONVERGE — dead-letter classification + queue failure sink
247// ---------------------------------------------------------------------------
248
249/// Read-only `enrich --status` report (no LLM, no singleton).
250///
251/// GAP-SG-42: all queue counts are scoped to the current `--operation` (rows
252/// migrated before the `operation` column, which are NULL, are still counted so
253/// a legacy queue is not silently reported as empty).
254#[derive(Debug, Serialize, schemars::JsonSchema)]
255pub struct EnrichStatus {
256    pub(super) status_report: bool,
257    pub(super) operation: String,
258    pub(super) namespace: String,
259    pub(super) unbound_backlog: usize,
260    /// GAP-SG-77: DATABASE-semantics backlog for the queried operation, computed
261    /// by `scan::count_operation_backlog` via a `SELECT COUNT(*)` over the real
262    /// store. This is distinct from `queue_pending`/`queue_dead` (FILE/sidecar
263    /// queue semantics) and from the legacy `unbound_backlog` (memory-bindings
264    /// only). It fixes the false `pending=0` that db-backed operations
265    /// (entity-descriptions/body-enrich/re-embed) previously reported.
266    pub(super) scan_backlog: i64,
267    pub(super) queue_pending: i64,
268    pub(super) queue_processing: i64,
269    pub(super) queue_done: i64,
270    pub(super) queue_failed: i64,
271    pub(super) queue_skipped: i64,
272    pub(super) queue_dead: i64,
273    pub(super) eligible_now: i64,
274    pub(super) waiting: i64,
275    /// GAP-SG-15/46: coarse backlog state, disambiguating an empty queue from a
276    /// not-yet-scanned backlog and from a cooldown wait.
277    /// `draining` (eligible items now) | `cooldown` (all pending items waiting on
278    /// `next_retry_at`) | `pending-scan` (candidates exist but the queue is not
279    /// populated — run enrich to scan) | `empty` (nothing left to do).
280    pub(super) state: &'static str,
281    /// GAP-SG-16: per-item `next_retry_at` for every pending row currently in
282    /// backoff, so an operator can see exactly when each will become eligible.
283    pub(super) waiting_items: Vec<WaitingItem>,
284}
285
286/// GAP-SG-16: one pending queue row waiting on its backoff cooldown.
287#[derive(Debug, Serialize, schemars::JsonSchema)]
288pub struct WaitingItem {
289    pub(super) item_key: String,
290    pub(super) attempt: i64,
291    pub(super) next_retry_at: Option<String>,
292    pub(super) error_class: Option<String>,
293}
294
295/// GAP-SG-23: one dead-letter row reported by `--list-dead`.
296#[derive(Debug, Serialize, schemars::JsonSchema)]
297pub struct DeadItem {
298    pub(super) dead_item: bool,
299    pub(super) item_key: String,
300    pub(super) item_type: String,
301    pub(super) attempt: i64,
302    pub(super) error_class: Option<String>,
303    pub(super) error: Option<String>,
304    /// GAP-SG-72: `choices[0].finish_reason` from the OpenRouter response
305    /// that produced this failure, when one was decoded (e.g. `"length"`
306    /// for a max_tokens truncation). `None` for subprocess-provider modes
307    /// or failures that never reached a decoded response.
308    pub(super) finish_reason: Option<String>,
309    /// GAP-SG-72: `usage.prompt_tokens` from the same response, when known.
310    pub(super) input_tokens: Option<i64>,
311    /// GAP-SG-72: `usage.completion_tokens` from the same response, when known.
312    pub(super) output_tokens: Option<i64>,
313}
314
315/// GAP-SG-23/11: summary footer for `--list-dead` and `--requeue-dead`.
316#[derive(Debug, Serialize, schemars::JsonSchema)]
317pub struct DeadSummary {
318    pub(super) summary: bool,
319    pub(super) operation: String,
320    pub(super) namespace: String,
321    /// `list-dead` | `requeue-dead` | `prune-dead-orphans`
322    pub(super) action: &'static str,
323    pub(super) dead_total: i64,
324    pub(super) requeued: i64,
325    /// GAP-SG-66: `prune-dead-orphans` — dead rows removed because their
326    /// referenced memory no longer exists in the main DB for the namespace.
327    /// Zero for `list-dead` / `requeue-dead`.
328    pub(super) pruned: i64,
329}
330
331/// Classifies an enrich item failure into a retry/dead-letter outcome.
332///
333/// This is the FALLBACK classifier: it is only consulted when the failure
334/// did not already carry a typed [`crate::retry::AttemptOutcome`] computed at
335/// its origin (see [`record_item_failure_typed`], fed by
336/// [`crate::commands::enrich::extraction::take_last_openrouter_failure`] for
337/// OpenRouter chat/embedding calls). Classification is TYPED by `AppError`
338/// variant only — NEVER by matching the formatted message — per
339/// `rules_rust_retry_com_backoff.md` ("NUNCA usar string matching em
340/// mensagens de erro").
341pub(super) fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
342    use crate::retry::AttemptOutcome;
343    match e {
344        AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
345            AttemptOutcome::Transient
346        }
347        // GAP-SG-78: a referenced entity that is not yet materialized is a
348        // TRANSITORY absence — a later enrich pass creates the entity — so the
349        // item is rescheduled, not dead-lettered on the first miss. Matched on
350        // the typed variant, never a message substring (rules_rust_retry: NUNCA
351        // string matching). The `--max-attempts` floor (default 8) still ends
352        // the item if the entity never materializes, mirroring the `Embedding`
353        // floor below.
354        AppError::EntityNotYetMaterialized { .. } => AttemptOutcome::Transient,
355        // GAP-SG-09: errors that are genuinely PERMANENT for this item and must
356        // dead-letter immediately (retrying cannot help): a structured provider
357        // rejection (context-length overflow / refusal carried as ProviderError),
358        // or a MEMORY that no longer exists (deleted or renamed between scan and
359        // processing). Entity absence is handled above as transitory, NOT here.
360        AppError::ProviderError { .. }
361        | AppError::NotFound(_)
362        | AppError::MemoryNotFound { .. }
363        | AppError::MemoryNotFoundById { .. } => AttemptOutcome::HardFailure,
364        // GAP-SG-76: SQLITE_BUSY/LOCKED is a lock-contention hiccup between the
365        // queue writer and a concurrent claim — retry it; any other database
366        // error (constraint violation, corruption, I/O) is permanent.
367        AppError::Database(_) => {
368            if crate::storage::utils::is_sqlite_busy(e) {
369                AttemptOutcome::Transient
370            } else {
371                AttemptOutcome::HardFailure
372            }
373        }
374        // GAP-SG-73: safe floor for the `re-embed` operation. `AppError::Embedding`
375        // reaches here only via `embed_with_fallback`'s backend-chain resolution
376        // (`crate::embedder`), which discards the origin-typed
377        // `EmbedError::retry_class` through `From<EmbedError> for AppError` before
378        // the error surfaces to the queue. Extracting the precise verdict would
379        // require bypassing the fallback chain to call the OpenRouter embedding
380        // client directly — out of scope here (touches `embedder.rs`, which is
381        // off-limits, and removes the multi-backend fallback safety net).
382        // Transient is the conservative choice: a persistently permanent failure
383        // still terminates via `--max-attempts` instead of retrying forever.
384        AppError::Embedding(_) => AttemptOutcome::Transient,
385        // Every other variant — including `Validation` without an
386        // origin-typed retry verdict attached — is treated as permanent.
387        // Previously this branch inspected the formatted message for
388        // substrings like "json" / "missing '" to guess at transience; that
389        // guesswork is now unnecessary because the OpenRouter chat path
390        // (the project's only supported enrich mode) attaches its retry
391        // verdict directly via `ChatError::retry_class`, computed at the
392        // exact HTTP status / provider code in `chat_api.rs`, and
393        // `record_item_failure_typed` consumes it BEFORE ever falling back
394        // to this classifier.
395        _ => AttemptOutcome::HardFailure,
396    }
397}
398
399/// Applies a failure outcome to a single queue row. Shared by the parallel
400/// worker and the serial loop (DRY). A `HardFailure`, or a transient failure
401/// whose attempt count reached `max_attempts`, lands in the dead-letter status
402/// (`status='dead'`) so it is never re-selected. A transient failure below the
403/// cap is rescheduled to `pending` with an exponential-backoff `next_retry_at`.
404/// Returns the [`crate::retry::AttemptOutcome`] so the caller can feed the
405/// existing circuit breaker.
406///
407/// GAP-SG-73: delegates to [`record_item_failure_typed`] with the outcome
408/// computed by the untyped fallback classifier and no diagnostics — the
409/// entry point for callers that only have a bare `&AppError` (subprocess
410/// providers, persistence failures).
411pub(super) fn record_item_failure(
412    queue_conn: &rusqlite::Connection,
413    queue_id: i64,
414    attempt: i64,
415    max_attempts: u32,
416    err: &AppError,
417) -> crate::retry::AttemptOutcome {
418    let outcome = classify_enrich_outcome(err);
419    let err_str = format!("{err}");
420    record_item_failure_typed(
421        queue_conn,
422        queue_id,
423        attempt,
424        max_attempts,
425        outcome,
426        &err_str,
427        None,
428        None,
429        None,
430    )
431}
432
433/// GAP-SG-72/73: applies a failure outcome to a single queue row using an
434/// [`crate::retry::AttemptOutcome`] the caller ALREADY computed at the
435/// failure's origin (e.g. `ChatError::retry_class` from an OpenRouter chat
436/// call), plus whatever truncation diagnostics (`finish_reason` and token
437/// counts) were available. This is the precise counterpart to
438/// [`record_item_failure`], which falls back to the untyped
439/// [`classify_enrich_outcome`] classifier when no origin-typed verdict
440/// exists. Both share this single write path (DRY).
441#[allow(clippy::too_many_arguments)]
442pub(super) fn record_item_failure_typed(
443    queue_conn: &rusqlite::Connection,
444    queue_id: i64,
445    attempt: i64,
446    max_attempts: u32,
447    outcome: crate::retry::AttemptOutcome,
448    err_str: &str,
449    finish_reason: Option<&str>,
450    input_tokens: Option<i64>,
451    output_tokens: Option<i64>,
452) -> crate::retry::AttemptOutcome {
453    use crate::retry::AttemptOutcome;
454    let error_class = match outcome {
455        AttemptOutcome::Transient => "transient",
456        AttemptOutcome::HardFailure => "permanent",
457        AttemptOutcome::Success => "success",
458    };
459
460    let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
461    if terminal {
462        let _ = queue_conn.execute(
463            "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now'), \
464             finish_reason=?3, input_tokens=?4, output_tokens=?5 WHERE id=?6",
465            rusqlite::params![
466                err_str,
467                error_class,
468                finish_reason,
469                input_tokens,
470                output_tokens,
471                queue_id
472            ],
473        );
474    } else {
475        let delay = crate::retry::compute_delay(
476            &crate::retry::RetryConfig::llm_rate_limit(),
477            attempt.max(0) as u32,
478        );
479        let secs = delay.as_secs().max(1);
480        let modifier = format!("+{secs} seconds");
481        let _ = queue_conn.execute(
482            "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3), \
483             finish_reason=?4, input_tokens=?5, output_tokens=?6 WHERE id=?7",
484            rusqlite::params![
485                err_str,
486                error_class,
487                modifier,
488                finish_reason,
489                input_tokens,
490                output_tokens,
491                queue_id
492            ],
493        );
494    }
495    outcome
496}
497
498/// GAP-SG-76: outcome of claiming the next pending queue row. Distinguishes
499/// a genuinely empty backlog (`QueryReturnedNoRows`) from lock contention
500/// (`SQLITE_BUSY`/`SQLITE_LOCKED`) so the caller retries briefly on the
501/// latter instead of breaking out of the drain loop early. Both the serial
502/// loop and the parallel worker loop share this (DRY) — previously each
503/// collapsed every `query_row` error into `.ok()`, silently treating a busy
504/// database the same as an empty queue.
505pub(super) enum DequeueOutcome {
506    Claimed((i64, String, String, i64)),
507    Empty,
508}
509
510pub(super) fn dequeue_next_pending(
511    queue_conn: &rusqlite::Connection,
512    backoff_clause: &str,
513) -> Result<DequeueOutcome, AppError> {
514    let dequeue_sql = format!(
515        "UPDATE queue SET status='processing', attempt=attempt+1 \
516         WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
517                     ORDER BY id LIMIT 1) \
518         RETURNING id, item_key, item_type, attempt"
519    );
520    match queue_conn.query_row(&dequeue_sql, [], |row| {
521        Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
522    }) {
523        Ok(claimed) => Ok(DequeueOutcome::Claimed(claimed)),
524        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(DequeueOutcome::Empty),
525        Err(e) => Err(AppError::Database(e)),
526    }
527}
528
529// ---------------------------------------------------------------------------
530// Tests
531// ---------------------------------------------------------------------------
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536
537    fn open_test_db() -> Connection {
538        let conn = Connection::open_in_memory().expect("in-memory db");
539        conn.execute_batch(
540            "CREATE TABLE memories (
541                id          INTEGER PRIMARY KEY AUTOINCREMENT,
542                namespace   TEXT NOT NULL DEFAULT 'global',
543                name        TEXT NOT NULL,
544                type        TEXT NOT NULL DEFAULT 'note',
545                description TEXT NOT NULL DEFAULT '',
546                body        TEXT NOT NULL DEFAULT '',
547                body_hash   TEXT NOT NULL DEFAULT '',
548                session_id  TEXT,
549                source      TEXT NOT NULL DEFAULT 'agent',
550                metadata    TEXT NOT NULL DEFAULT '{}',
551                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
552                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
553                deleted_at  INTEGER,
554                UNIQUE(namespace, name)
555            );",
556        )
557        .expect("schema creation must succeed");
558        conn
559    }
560
561    fn open_temp_queue() -> (Connection, String) {
562        let path = format!(
563            "/tmp/test-enrich-dl-{}-{}.sqlite",
564            std::process::id(),
565            fastrand::u64(..)
566        );
567        let conn = open_queue_db(&path).expect("queue db must open");
568        (conn, path)
569    }
570
571    fn insert_pending(conn: &Connection, key: &str) -> i64 {
572        conn.execute(
573            "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
574            rusqlite::params![key],
575        )
576        .unwrap();
577        conn.last_insert_rowid()
578    }
579
580    #[test]
581    fn queue_db_schema_creates_correctly() {
582        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
583        let conn = open_queue_db(&tmp_path).expect("queue db must open");
584        let count: i64 = conn
585            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
586            .unwrap();
587        assert_eq!(count, 0);
588        let _ = std::fs::remove_file(&tmp_path);
589    }
590
591    #[test]
592    fn classify_rate_limit_is_transient() {
593        let e = AppError::RateLimited {
594            detail: "429".into(),
595        };
596        assert_eq!(
597            classify_enrich_outcome(&e),
598            crate::retry::AttemptOutcome::Transient
599        );
600    }
601
602    #[test]
603    fn classify_timeout_and_dbbusy_are_transient() {
604        let t = AppError::Timeout {
605            operation: "judge".into(),
606            duration_secs: 30,
607        };
608        let b = AppError::DbBusy("locked".into());
609        assert_eq!(
610            classify_enrich_outcome(&t),
611            crate::retry::AttemptOutcome::Transient
612        );
613        assert_eq!(
614            classify_enrich_outcome(&b),
615            crate::retry::AttemptOutcome::Transient
616        );
617    }
618
619    #[test]
620    fn classify_validation_and_parse_are_hard_failure() {
621        let v = AppError::Validation("failed to parse entities array: bad".into());
622        assert_eq!(
623            classify_enrich_outcome(&v),
624            crate::retry::AttemptOutcome::HardFailure
625        );
626    }
627
628    #[test]
629    fn open_queue_db_alter_is_idempotent() {
630        let path = format!(
631            "/tmp/test-enrich-idem-{}-{}.sqlite",
632            std::process::id(),
633            fastrand::u64(..)
634        );
635        let _ = open_queue_db(&path).expect("first open");
636        let conn = open_queue_db(&path).expect("second open is idempotent");
637        let cols: Vec<String> = {
638            let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
639            stmt.query_map([], |r| r.get::<_, String>(1))
640                .unwrap()
641                .collect::<Result<Vec<_>, _>>()
642                .unwrap()
643        };
644        assert!(cols.iter().any(|c| c == "error_class"));
645        assert!(cols.iter().any(|c| c == "next_retry_at"));
646        let _ = std::fs::remove_file(&path);
647    }
648
649    #[test]
650    fn record_item_failure_hard_marks_dead() {
651        let (conn, path) = open_temp_queue();
652        let id = insert_pending(&conn, "mem-hard");
653        let outcome = record_item_failure(
654            &conn,
655            id,
656            1,
657            5,
658            &AppError::Validation("invalid body".into()),
659        );
660        assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
661        let status: String = conn
662            .query_row(
663                "SELECT status FROM queue WHERE id=?1",
664                rusqlite::params![id],
665                |r| r.get(0),
666            )
667            .unwrap();
668        assert_eq!(status, "dead");
669        let _ = std::fs::remove_file(&path);
670    }
671
672    #[test]
673    fn record_item_failure_transient_reschedules_pending() {
674        let (conn, path) = open_temp_queue();
675        let id = insert_pending(&conn, "mem-transient");
676        let outcome = record_item_failure(
677            &conn,
678            id,
679            1,
680            5,
681            &AppError::RateLimited {
682                detail: "429".into(),
683            },
684        );
685        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
686        let (status, future): (String, i64) = conn
687            .query_row(
688                "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
689                rusqlite::params![id],
690                |r| Ok((r.get(0)?, r.get(1)?)),
691            )
692            .unwrap();
693        assert_eq!(status, "pending");
694        assert_eq!(future, 1, "next_retry_at must be in the future");
695        let _ = std::fs::remove_file(&path);
696    }
697
698    #[test]
699    fn record_item_failure_transient_at_cap_marks_dead() {
700        let (conn, path) = open_temp_queue();
701        let id = insert_pending(&conn, "mem-cap");
702        let outcome = record_item_failure(
703            &conn,
704            id,
705            5,
706            5,
707            &AppError::RateLimited {
708                detail: "429".into(),
709            },
710        );
711        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
712        let status: String = conn
713            .query_row(
714                "SELECT status FROM queue WHERE id=?1",
715                rusqlite::params![id],
716                |r| r.get(0),
717            )
718            .unwrap();
719        assert_eq!(status, "dead");
720        let _ = std::fs::remove_file(&path);
721    }
722
723    #[test]
724    fn dequeue_skips_future_retry_and_dead() {
725        let (conn, path) = open_temp_queue();
726        let eligible = insert_pending(&conn, "mem-eligible");
727        let waiting = insert_pending(&conn, "mem-waiting");
728        conn.execute(
729            "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
730            rusqlite::params![waiting],
731        )
732        .unwrap();
733        let dead = insert_pending(&conn, "mem-dead");
734        conn.execute(
735            "UPDATE queue SET status='dead' WHERE id=?1",
736            rusqlite::params![dead],
737        )
738        .unwrap();
739
740        let claimed: Option<i64> = conn
741            .query_row(
742                "UPDATE queue SET status='processing', attempt=attempt+1 \
743                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
744                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
745                             ORDER BY id LIMIT 1) \
746                 RETURNING id",
747                [],
748                |r| r.get(0),
749            )
750            .ok();
751        assert_eq!(claimed, Some(eligible));
752
753        let second: Option<i64> = conn
754            .query_row(
755                "UPDATE queue SET status='processing', attempt=attempt+1 \
756                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
757                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
758                             ORDER BY id LIMIT 1) \
759                 RETURNING id",
760                [],
761                |r| r.get(0),
762            )
763            .ok();
764        assert_eq!(second, None);
765        let _ = std::fs::remove_file(&path);
766    }
767
768    #[test]
769    fn classify_validation_never_infers_transience_from_message() {
770        // GAP-SG-73: the fallback classifier is TYPED-only now. Messages
771        // that used to be sniffed for "json" / "missing '" substrings and
772        // treated as Transient are HardFailure here — the OpenRouter chat
773        // path (the project's only supported enrich mode) attaches its own
774        // typed `ChatError::retry_class` for these exact shape failures
775        // BEFORE `record_item_failure_typed` ever falls back to this
776        // classifier, so no message-based guessing survives in the fallback.
777        for msg in [
778            "model 'x' returned non-object JSON after repair (got string)",
779            "model 'x' returned content that could not be parsed even after JSON repair",
780            "model 'x' returned no structured content",
781            "LLM result missing 'description' field",
782            "LLM result missing 'enriched_body' field",
783        ] {
784            assert_eq!(
785                classify_enrich_outcome(&AppError::Validation(msg.into())),
786                crate::retry::AttemptOutcome::HardFailure,
787                "expected hard failure for: {msg}"
788            );
789        }
790    }
791
792    #[test]
793    fn classify_embedding_error_is_transient_floor() {
794        assert_eq!(
795            classify_enrich_outcome(&AppError::Embedding("dimension mismatch".into())),
796            crate::retry::AttemptOutcome::Transient
797        );
798    }
799
800    // GAP-SG-78: entity absence is Transient (own typed variant); memory
801    // absence and the untyped NotFound string stay HardFailure. No substring.
802    #[test]
803    fn classify_entity_not_yet_materialized_is_transient() {
804        assert_eq!(
805            classify_enrich_outcome(&AppError::EntityNotYetMaterialized {
806                name: "acme".into(),
807                namespace: "global".into(),
808            }),
809            crate::retry::AttemptOutcome::Transient
810        );
811    }
812
813    #[test]
814    fn classify_memory_absence_stays_hard_failure() {
815        assert_eq!(
816            classify_enrich_outcome(&AppError::MemoryNotFound {
817                name: "mem-x".into(),
818                namespace: "global".into(),
819            }),
820            crate::retry::AttemptOutcome::HardFailure
821        );
822        assert_eq!(
823            classify_enrich_outcome(&AppError::MemoryNotFoundById { id: 42 }),
824            crate::retry::AttemptOutcome::HardFailure
825        );
826        assert_eq!(
827            classify_enrich_outcome(&AppError::NotFound("gone".into())),
828            crate::retry::AttemptOutcome::HardFailure
829        );
830    }
831
832    #[test]
833    fn classify_database_busy_is_transient_non_busy_is_hard() {
834        let busy = AppError::Database(rusqlite::Error::SqliteFailure(
835            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
836            Some("database is locked".into()),
837        ));
838        assert_eq!(
839            classify_enrich_outcome(&busy),
840            crate::retry::AttemptOutcome::Transient
841        );
842        let constraint = AppError::Database(rusqlite::Error::SqliteFailure(
843            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
844            Some("UNIQUE constraint failed".into()),
845        ));
846        assert_eq!(
847            classify_enrich_outcome(&constraint),
848            crate::retry::AttemptOutcome::HardFailure
849        );
850    }
851
852    #[test]
853    fn record_item_failure_typed_persists_diagnostics_on_dead_letter() {
854        let (conn, path) = open_temp_queue();
855        let id = insert_pending(&conn, "mem-diag");
856        let outcome = record_item_failure_typed(
857            &conn,
858            id,
859            1,
860            5,
861            crate::retry::AttemptOutcome::HardFailure,
862            "truncated response",
863            Some("length"),
864            Some(120),
865            Some(4096),
866        );
867        assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
868        let (status, finish_reason, input_tokens, output_tokens): (
869            String,
870            Option<String>,
871            Option<i64>,
872            Option<i64>,
873        ) = conn
874            .query_row(
875                "SELECT status, finish_reason, input_tokens, output_tokens FROM queue WHERE id=?1",
876                rusqlite::params![id],
877                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
878            )
879            .unwrap();
880        assert_eq!(status, "dead");
881        assert_eq!(finish_reason.as_deref(), Some("length"));
882        assert_eq!(input_tokens, Some(120));
883        assert_eq!(output_tokens, Some(4096));
884        let _ = std::fs::remove_file(&path);
885    }
886
887    #[test]
888    fn record_item_failure_typed_reschedules_transient_below_max_attempts() {
889        // GAP-SG-72-chat: a transient failure (e.g. a truncated OpenRouter
890        // response) below max_attempts must stay `pending` with a
891        // future `next_retry_at`, not go straight to `dead` — and it must
892        // still persist the finish_reason/token diagnostics for later
893        // inspection via `--list-dead` / `--status`.
894        let (conn, path) = open_temp_queue();
895        let id = insert_pending(&conn, "mem-retry");
896        let outcome = record_item_failure_typed(
897            &conn,
898            id,
899            1,
900            5,
901            crate::retry::AttemptOutcome::Transient,
902            "truncated response",
903            Some("length"),
904            Some(120),
905            Some(64),
906        );
907        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
908        let (status, error_class, finish_reason, next_retry_at): (
909            String,
910            String,
911            Option<String>,
912            Option<String>,
913        ) = conn
914            .query_row(
915                "SELECT status, error_class, finish_reason, next_retry_at FROM queue WHERE id=?1",
916                rusqlite::params![id],
917                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
918            )
919            .unwrap();
920        assert_eq!(status, "pending");
921        assert_eq!(error_class, "transient");
922        assert_eq!(finish_reason.as_deref(), Some("length"));
923        assert!(
924            next_retry_at.is_some(),
925            "a rescheduled item must carry a next_retry_at"
926        );
927        let _ = std::fs::remove_file(&path);
928    }
929
930    /// GAP-SG-76/v1.1.00 fix: proves the enrich drain loops' composition
931    /// `with_busy_retry(|| dequeue_next_pending(...))` is BOUNDED under
932    /// sustained lock contention instead of the previous
933    /// `loop { ... continue; }`, which retried `SQLITE_BUSY` forever. A
934    /// second connection holds an exclusive write lock for the whole test;
935    /// the queue connection under test has `busy_timeout=0` so SQLite
936    /// reports `SQLITE_BUSY` immediately instead of blocking internally,
937    /// isolating `with_busy_retry`'s own bounded backoff (5 attempts) as the
938    /// only source of delay.
939    #[test]
940    fn with_busy_retry_bounds_dequeue_under_sustained_contention() {
941        let (conn, path) = open_temp_queue();
942        insert_pending(&conn, "mem-busy");
943        conn.pragma_update(None, "busy_timeout", 0i64)
944            .expect("busy_timeout override must succeed");
945
946        // Second connection holds an EXCLUSIVE write lock so every dequeue
947        // attempt on `conn` observes SQLITE_BUSY, never SQLITE_LOCKED-then-
948        // clears-up.
949        let blocker = Connection::open(&path).expect("blocker connection must open");
950        blocker
951            .execute_batch("BEGIN EXCLUSIVE;")
952            .expect("exclusive lock must be acquired");
953
954        let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
955        let calls_clone = std::sync::Arc::clone(&calls);
956        let result: Result<DequeueOutcome, AppError> =
957            crate::storage::utils::with_busy_retry(|| {
958                calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
959                dequeue_next_pending(&conn, "")
960            });
961
962        assert!(
963            matches!(result, Err(AppError::DbBusy(_))),
964            "sustained SQLITE_BUSY must convert to DbBusy, not hang or silently report Empty"
965        );
966        assert_eq!(
967            calls.load(std::sync::atomic::Ordering::SeqCst),
968            crate::constants::MAX_SQLITE_BUSY_RETRIES,
969            "must attempt exactly MAX_SQLITE_BUSY_RETRIES times, never retry unbounded"
970        );
971
972        blocker
973            .execute_batch("ROLLBACK;")
974            .expect("releasing the exclusive lock must succeed");
975        let _ = std::fs::remove_file(&path);
976    }
977
978    #[test]
979    fn dequeue_next_pending_distinguishes_empty_from_claimed() {
980        let (conn, path) = open_temp_queue();
981        let id = insert_pending(&conn, "mem-dequeue");
982        let claimed = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
983        match claimed {
984            DequeueOutcome::Claimed((claimed_id, key, _, _)) => {
985                assert_eq!(claimed_id, id);
986                assert_eq!(key, "mem-dequeue");
987            }
988            DequeueOutcome::Empty => panic!("expected a claimed row"),
989        }
990        let empty = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
991        assert!(matches!(empty, DequeueOutcome::Empty));
992        let _ = std::fs::remove_file(&path);
993    }
994
995    #[test]
996    fn classify_provider_error_and_not_found_are_hard() {
997        assert_eq!(
998            classify_enrich_outcome(&AppError::ProviderError {
999                code: "400".into(),
1000                message: "context length exceeded".into(),
1001            }),
1002            crate::retry::AttemptOutcome::HardFailure
1003        );
1004        assert_eq!(
1005            classify_enrich_outcome(&AppError::NotFound("memory 'gone' not found".into())),
1006            crate::retry::AttemptOutcome::HardFailure
1007        );
1008    }
1009
1010    #[test]
1011    fn open_queue_db_migrates_operation_column() {
1012        let (conn, path) = open_temp_queue();
1013        drop(conn);
1014        let conn = open_queue_db(&path).expect("second open is idempotent");
1015        let cols: Vec<String> = {
1016            let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
1017            stmt.query_map([], |r| r.get::<_, String>(1))
1018                .unwrap()
1019                .collect::<Result<Vec<_>, _>>()
1020                .unwrap()
1021        };
1022        assert!(cols.iter().any(|c| c == "operation"));
1023        assert!(cols.iter().any(|c| c == "memory_id"));
1024        let _ = std::fs::remove_file(&path);
1025    }
1026
1027    #[test]
1028    fn enqueue_candidate_tags_operation_and_memory_id() {
1029        let main = open_test_db();
1030        main.execute(
1031            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-x', 'body')",
1032            [],
1033        )
1034        .unwrap();
1035        let mem_id: i64 = main
1036            .query_row("SELECT id FROM memories WHERE name='mem-x'", [], |r| {
1037                r.get(0)
1038            })
1039            .unwrap();
1040        let (queue, path) = open_temp_queue();
1041        enqueue_candidate(&queue, &main, "global", "mem-x", "memory", "MemoryBindings");
1042        let (op, mid): (String, i64) = queue
1043            .query_row(
1044                "SELECT operation, memory_id FROM queue WHERE item_key='mem-x'",
1045                [],
1046                |r| Ok((r.get(0)?, r.get(1)?)),
1047            )
1048            .unwrap();
1049        assert_eq!(op, "MemoryBindings");
1050        assert_eq!(mid, mem_id);
1051        let _ = std::fs::remove_file(&path);
1052    }
1053
1054    #[test]
1055    fn requeue_dead_resurrects_dead_rows() {
1056        let (conn, path) = open_temp_queue();
1057        conn.execute(
1058            "INSERT INTO queue (item_key, item_type, status, operation, attempt, error, error_class, next_retry_at) \
1059             VALUES ('mem-dead', 'memory', 'dead', 'MemoryBindings', 8, 'boom', 'permanent', datetime('now'))",
1060            [],
1061        )
1062        .unwrap();
1063        let n = conn
1064            .execute(
1065                "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1066                 error=NULL, error_class=NULL \
1067                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1068                rusqlite::params!["MemoryBindings"],
1069            )
1070            .unwrap();
1071        assert_eq!(n, 1);
1072        let (status, attempt, nra): (String, i64, Option<String>) = conn
1073            .query_row(
1074                "SELECT status, attempt, next_retry_at FROM queue WHERE item_key='mem-dead'",
1075                [],
1076                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1077            )
1078            .unwrap();
1079        assert_eq!(status, "pending");
1080        assert_eq!(attempt, 0);
1081        assert!(nra.is_none());
1082        let _ = std::fs::remove_file(&path);
1083    }
1084
1085    #[test]
1086    fn skipped_item_keys_excludes_only_skipped_for_operation() {
1087        // GAP-SG-69: the body-enrich scan must drop memories already vetoed
1088        // `status='skipped'` so `--until-empty` converges instead of re-scanning a
1089        // non-expandable short body forever (the detached worker reported a
1090        // stuck backlog for 30+ min).
1091        let (conn, path) = open_temp_queue();
1092        conn.execute(
1093            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-vetoed', 'memory', 'skipped', 'BodyEnrich')",
1094            [],
1095        )
1096        .unwrap();
1097        conn.execute(
1098            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-pending', 'memory', 'pending', 'BodyEnrich')",
1099            [],
1100        )
1101        .unwrap();
1102        conn.execute(
1103            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-other-op', 'memory', 'skipped', 'MemoryBindings')",
1104            [],
1105        )
1106        .unwrap();
1107        let keys = skipped_item_keys(&conn, "BodyEnrich").unwrap();
1108        assert!(
1109            keys.contains("mem-vetoed"),
1110            "vetoed BodyEnrich item must be excluded from scan"
1111        );
1112        assert!(
1113            !keys.contains("mem-pending"),
1114            "pending item is still actionable"
1115        );
1116        assert!(
1117            !keys.contains("mem-other-op"),
1118            "skipped item from another operation must not leak"
1119        );
1120        assert_eq!(keys.len(), 1);
1121        let _ = std::fs::remove_file(&path);
1122    }
1123
1124    #[test]
1125    fn cascade_cleanup_delete_targets_memory_id_and_name() {
1126        let (conn, path) = open_temp_queue();
1127        conn.execute(
1128            "INSERT INTO queue (item_key, item_type, status, memory_id) VALUES ('by-id', 'memory', 'done', 42)",
1129            [],
1130        )
1131        .unwrap();
1132        conn.execute(
1133            "INSERT INTO queue (item_key, item_type, status) VALUES ('by-name', 'memory', 'pending')",
1134            [],
1135        )
1136        .unwrap();
1137        let removed = conn
1138            .execute(
1139                "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
1140                rusqlite::params![42_i64, "by-name"],
1141            )
1142            .unwrap();
1143        assert_eq!(removed, 2);
1144        let remaining: i64 = conn
1145            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
1146            .unwrap();
1147        assert_eq!(remaining, 0);
1148        let _ = std::fs::remove_file(&path);
1149    }
1150
1151    #[test]
1152    fn item_type_for_maps_entity_and_memory() {
1153        assert_eq!(
1154            item_type_for(&EnrichOperation::EntityDescriptions),
1155            "entity"
1156        );
1157        assert_eq!(item_type_for(&EnrichOperation::MemoryBindings), "memory");
1158        assert_eq!(item_type_for(&EnrichOperation::AugmentBindings), "memory");
1159        assert_eq!(item_type_for(&EnrichOperation::BodyExtract), "memory");
1160    }
1161
1162    #[test]
1163    fn prune_dead_orphans_removes_only_orphan_memory_rows() {
1164        let main = open_test_db();
1165        // One live memory whose dead row must be KEPT (it still exists).
1166        main.execute(
1167            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'alive', 'b')",
1168            [],
1169        )
1170        .unwrap();
1171        let (queue, path) = open_temp_queue();
1172        // Orphan dead memory row (no matching memory) -> pruned.
1173        queue
1174            .execute(
1175                "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1176                 VALUES ('gone', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1177                [],
1178            )
1179            .unwrap();
1180        // Live dead memory row (memory exists) -> kept.
1181        queue
1182            .execute(
1183                "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1184                 VALUES ('alive', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1185                [],
1186            )
1187            .unwrap();
1188        // Entity dead row -> never touched (key is not a memory name).
1189        queue
1190            .execute(
1191                "INSERT INTO queue (item_key, item_type, status, operation) \
1192                 VALUES ('some-entity', 'entity', 'dead', 'EntityDescriptions')",
1193                [],
1194            )
1195            .unwrap();
1196
1197        let pruned = prune_dead_orphans(&queue, &main, "MemoryBindings", "global").unwrap();
1198        assert_eq!(pruned, 1, "only the orphan memory row is pruned");
1199
1200        let remaining: Vec<String> = {
1201            let mut stmt = queue
1202                .prepare("SELECT item_key FROM queue ORDER BY item_key")
1203                .unwrap();
1204            stmt.query_map([], |r| r.get::<_, String>(0))
1205                .unwrap()
1206                .collect::<Result<Vec<_>, _>>()
1207                .unwrap()
1208        };
1209        assert_eq!(remaining, vec!["alive", "some-entity"]);
1210        let _ = std::fs::remove_file(&path);
1211    }
1212}