Skip to main content

sqlite_graphrag/commands/enrich/
queue.rs

1//! Enrichment queue — SQLite-backed scan/retry/dead-letter DB.
2
3use super::*;
4
5// ---------------------------------------------------------------------------
6// Queue DB
7// ---------------------------------------------------------------------------
8
9/// Opens or creates the enrichment queue database.
10///
11/// The queue schema mirrors `ingest_claude` for resume/retry parity.
12/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
13///
14/// # DRY note
15///
16/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
17/// Both should be unified in a shared `llm_runner.rs` module by the
18/// Integration stream.
19pub(super) fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
20    let conn = Connection::open(path)?;
21    conn.pragma_update(None, "journal_mode", "wal")?;
22    // GAP-SG-76: without an explicit busy_timeout, a lock contention window
23    // between the dequeue claim and a concurrent worker/main-DB writer
24    // surfaces as SQLITE_BUSY immediately instead of retrying briefly.
25    // Reuses the project-wide canonical value (see rules_rust_sqlite.md —
26    // "DEFINIR busy_timeout em milissegundos explícitos por conexão").
27    conn.pragma_update(None, "busy_timeout", crate::constants::BUSY_TIMEOUT_MILLIS)?;
28    conn.execute_batch(
29        "CREATE TABLE IF NOT EXISTS queue (
30            id          INTEGER PRIMARY KEY AUTOINCREMENT,
31            item_key    TEXT NOT NULL UNIQUE,
32            item_type   TEXT NOT NULL DEFAULT 'memory',
33            status      TEXT NOT NULL DEFAULT 'pending',
34            memory_id   INTEGER,
35            entity_id   INTEGER,
36            entities    INTEGER DEFAULT 0,
37            rels        INTEGER DEFAULT 0,
38            error       TEXT,
39            cost_usd    REAL DEFAULT 0.0,
40            attempt     INTEGER DEFAULT 0,
41            elapsed_ms  INTEGER,
42            created_at  TEXT DEFAULT (datetime('now')),
43            done_at     TEXT
44        );
45        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
46    )?;
47    // GAP-ENRICH-BACKLOG-CONVERGE (v1.0.96): dead-letter columns. The legacy
48    // `.enrich-queue.sqlite` predates these columns and `CREATE TABLE IF NOT
49    // EXISTS` never alters an existing table, so add them idempotently here.
50    let mut has_error_class = false;
51    let mut has_next_retry_at = false;
52    // GAP-SG-12/42: the `operation` column scopes queue rows to the enrich
53    // operation that enqueued them, so `--status` can segment counts per
54    // operation instead of conflating a shared `item_key` space. Migrated
55    // idempotently here for the same reason as the v1.0.96 columns.
56    let mut has_operation = false;
57    // GAP-SG-72: dead-letter diagnostics carried from a typed OpenRouter
58    // `ChatError` (finish_reason + token counts) so `--list-dead` can show
59    // WHY an item died (e.g. truncated by max_tokens) instead of only the
60    // formatted error string. Migrated idempotently for the same reason as
61    // the columns above.
62    let mut has_finish_reason = false;
63    let mut has_input_tokens = false;
64    let mut has_output_tokens = false;
65    {
66        let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
67        let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
68        for name in names {
69            match name?.as_str() {
70                "error_class" => has_error_class = true,
71                "next_retry_at" => has_next_retry_at = true,
72                "operation" => has_operation = true,
73                "finish_reason" => has_finish_reason = true,
74                "input_tokens" => has_input_tokens = true,
75                "output_tokens" => has_output_tokens = true,
76                _ => {}
77            }
78        }
79    }
80    if !has_error_class {
81        conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
82    }
83    if !has_next_retry_at {
84        conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
85    }
86    if !has_operation {
87        conn.execute_batch("ALTER TABLE queue ADD COLUMN operation TEXT")?;
88    }
89    if !has_finish_reason {
90        conn.execute_batch("ALTER TABLE queue ADD COLUMN finish_reason TEXT")?;
91    }
92    if !has_input_tokens {
93        conn.execute_batch("ALTER TABLE queue ADD COLUMN input_tokens INTEGER")?;
94    }
95    if !has_output_tokens {
96        conn.execute_batch("ALTER TABLE queue ADD COLUMN output_tokens INTEGER")?;
97    }
98    conn.execute_batch(
99        "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at);
100         CREATE INDEX IF NOT EXISTS idx_enrich_queue_operation ON queue(operation, status);
101         CREATE INDEX IF NOT EXISTS idx_enrich_queue_memory ON queue(memory_id)",
102    )?;
103    Ok(conn)
104}
105
106/// GAP-SG-12: enqueue one scan candidate, linking it to its `memory_id` and
107/// tagging it with the originating `operation`. For memory-keyed operations the
108/// id is resolved from `main_conn` so the cascade cleanup (GAP-SG-13) can target
109/// the queue row by `memory_id` even before the item is processed. Entity/id
110/// keyed operations leave `memory_id` NULL (the `item_key` carries the link).
111/// `INSERT OR IGNORE` preserves the v1.0.96 invariant that a dead-letter row is
112/// never resurrected by re-enqueue (item_key is UNIQUE).
113pub(super) fn enqueue_candidate(
114    queue_conn: &Connection,
115    main_conn: &Connection,
116    namespace: &str,
117    key: &str,
118    item_type: &str,
119    operation: &str,
120) {
121    let memory_id: Option<i64> = if item_type == "memory" {
122        main_conn
123            .query_row(
124                "SELECT id FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
125                rusqlite::params![namespace, key],
126                |r| r.get(0),
127            )
128            .ok()
129    } else {
130        None
131    };
132    if let Err(e) = queue_conn.execute(
133        "INSERT OR IGNORE INTO queue (item_key, item_type, status, operation, memory_id) \
134         VALUES (?1, ?2, 'pending', ?3, ?4)",
135        rusqlite::params![key, item_type, operation, memory_id],
136    ) {
137        tracing::warn!(target: "enrich", error = %e, "queue insert failed");
138    }
139}
140
141/// GAP-SG-69: item_keys vetoed `status='skipped'` for an operation. The
142/// body-enrich scan selects candidates purely by `LENGTH(body) <
143/// min_output_chars`, so a short body whose rewrite the preservation guard keeps
144/// rejecting would be re-scanned every pass and `--until-empty` would never
145/// converge. Callers exclude these keys so the scan returns only actionable
146/// items; `cleanup_queue_entry` clears the veto when the body actually changes,
147/// restoring the memory as a candidate.
148pub(super) fn skipped_item_keys(
149    conn: &Connection,
150    operation: &str,
151) -> Result<std::collections::HashSet<String>, AppError> {
152    let mut stmt = conn.prepare(
153        "SELECT item_key FROM queue WHERE status='skipped' AND (operation = ?1 OR operation IS NULL)",
154    )?;
155    let keys = stmt
156        .query_map(rusqlite::params![operation], |r| r.get::<_, String>(0))?
157        .collect::<Result<std::collections::HashSet<String>, _>>()?;
158    Ok(keys)
159}
160
161/// Queue `item_type` for an operation: entity-keyed operations use `"entity"`,
162/// every other (memory/id-keyed) operation uses `"memory"`.
163pub(super) fn item_type_for(operation: &EnrichOperation) -> &'static str {
164    match operation {
165        EnrichOperation::EntityDescriptions => "entity",
166        _ => "memory",
167    }
168}
169
170/// v1.1.1 (P2): per-key `item_type` override for the re-embed targets.
171///
172/// Re-embed keys are prefixed with `entity:` / `chunk:` when `--target`
173/// selects a non-memory table; the queue row must carry the real item type
174/// so `prune_dead_orphans` (which only reaps `item_type='memory'` rows)
175/// never mistakes an entity/chunk key for an orphaned memory name.
176/// Unprefixed keys keep the operation-level default.
177pub(super) fn item_type_for_key(key: &str, default: &'static str) -> &'static str {
178    if key.starts_with("entity:") {
179        "entity"
180    } else if key.starts_with("chunk:") {
181        "chunk"
182    } else {
183        default
184    }
185}
186
187/// GAP-SG-13: remove a memory's enrich-queue entry when the memory is deleted or
188/// force-merged, so the dead-letter / pending sidecar never references a row
189/// that no longer exists. Best-effort and a no-op when the queue file is absent
190/// (the common case after a clean run, which removes it). Targets BOTH
191/// `memory_id` (populated at enqueue for memory ops, GAP-SG-12) and `item_key`
192/// (the memory name) so pending rows enqueued before id resolution are also
193/// cleared. Errors are logged, never propagated — cleanup must not fail the
194/// caller's delete/upsert.
195pub fn cleanup_queue_entry(db_path: &std::path::Path, memory_id: i64, name: &str) {
196    let queue_path = crate::paths::sidecar_path(db_path, ".enrich-queue.sqlite");
197    if !queue_path.exists() {
198        return;
199    }
200    match open_queue_db(&queue_path) {
201        Ok(conn) => {
202            if let Err(e) = conn.execute(
203                "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
204                rusqlite::params![memory_id, name],
205            ) {
206                tracing::warn!(target: "enrich", error = %e, memory_id, "enrich-queue cleanup failed");
207            }
208        }
209        Err(e) => {
210            tracing::warn!(target: "enrich", error = %e, "enrich-queue cleanup skipped (open failed)");
211        }
212    }
213}
214
215/// GAP-SG-66: prune ORPHAN dead-letter rows — `status='dead'` memory rows whose
216/// `item_key` (the memory name) no longer exists in the main DB for `namespace`.
217///
218/// These are terminal "not found" failures (the memory was renamed/purged after
219/// being enqueued): re-processing them just re-fails with the same not-found
220/// error, so `--requeue-dead` can never recover them and they inflate
221/// `queue_dead` forever. Read-only on the main DB; deletes only the
222/// confirmed-orphan rows from the queue sidecar. Entity-keyed dead rows
223/// (`item_type='entity'`) are left untouched — their key is an entity name, not
224/// a memory name. Returns the number of rows pruned.
225pub(super) fn prune_dead_orphans(
226    queue_conn: &Connection,
227    main_conn: &Connection,
228    operation: &str,
229    namespace: &str,
230) -> Result<i64, AppError> {
231    let dead: Vec<(i64, String)> = {
232        let mut stmt = queue_conn.prepare(
233            "SELECT id, item_key FROM queue \
234             WHERE status='dead' AND item_type='memory' \
235             AND (operation = ?1 OR operation IS NULL) ORDER BY id",
236        )?;
237        let rows = stmt
238            .query_map(rusqlite::params![operation], |r| Ok((r.get(0)?, r.get(1)?)))?
239            .collect::<Result<Vec<_>, _>>()?;
240        rows
241    };
242    let mut pruned = 0_i64;
243    for (id, name) in dead {
244        let exists = main_conn
245            .query_row(
246                "SELECT 1 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
247                rusqlite::params![namespace, name],
248                |_| Ok(()),
249            )
250            .is_ok();
251        if !exists {
252            queue_conn.execute("DELETE FROM queue WHERE id=?1", rusqlite::params![id])?;
253            pruned += 1;
254        }
255    }
256    if pruned > 0 {
257        let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
258    }
259    Ok(pruned)
260}
261
262// ---------------------------------------------------------------------------
263// GAP-ENRICH-BACKLOG-CONVERGE — dead-letter classification + queue failure sink
264// ---------------------------------------------------------------------------
265
266/// Read-only `enrich --status` report (no LLM, no singleton).
267///
268/// GAP-SG-42: all queue counts are scoped to the current `--operation` (rows
269/// migrated before the `operation` column, which are NULL, are still counted so
270/// a legacy queue is not silently reported as empty).
271#[derive(Debug, Serialize, schemars::JsonSchema)]
272pub struct EnrichStatus {
273    pub(super) status_report: bool,
274    pub(super) operation: String,
275    pub(super) namespace: String,
276    pub(super) unbound_backlog: usize,
277    /// GAP-SG-77: DATABASE-semantics backlog for the queried operation, computed
278    /// by `scan::count_operation_backlog` via a `SELECT COUNT(*)` over the real
279    /// store. This is distinct from `queue_pending`/`queue_dead` (FILE/sidecar
280    /// queue semantics) and from the legacy `unbound_backlog` (memory-bindings
281    /// only). It fixes the false `pending=0` that db-backed operations
282    /// (entity-descriptions/body-enrich/re-embed) previously reported.
283    pub(super) scan_backlog: i64,
284    pub(super) queue_pending: i64,
285    pub(super) queue_processing: i64,
286    pub(super) queue_done: i64,
287    pub(super) queue_failed: i64,
288    pub(super) queue_skipped: i64,
289    pub(super) queue_dead: i64,
290    pub(super) eligible_now: i64,
291    pub(super) waiting: i64,
292    /// GAP-SG-15/46: coarse backlog state, disambiguating an empty queue from a
293    /// not-yet-scanned backlog and from a cooldown wait.
294    /// `draining` (eligible items now) | `cooldown` (all pending items waiting on
295    /// `next_retry_at`) | `pending-scan` (candidates exist but the queue is not
296    /// populated — run enrich to scan) | `empty` (nothing left to do).
297    pub(super) state: &'static str,
298    /// GAP-SG-16: per-item `next_retry_at` for every pending row currently in
299    /// backoff, so an operator can see exactly when each will become eligible.
300    pub(super) waiting_items: Vec<WaitingItem>,
301}
302
303/// GAP-SG-16: one pending queue row waiting on its backoff cooldown.
304#[derive(Debug, Serialize, schemars::JsonSchema)]
305pub struct WaitingItem {
306    pub(super) item_key: String,
307    pub(super) attempt: i64,
308    pub(super) next_retry_at: Option<String>,
309    pub(super) error_class: Option<String>,
310}
311
312/// GAP-SG-23: one dead-letter row reported by `--list-dead`.
313#[derive(Debug, Serialize, schemars::JsonSchema)]
314pub struct DeadItem {
315    pub(super) dead_item: bool,
316    pub(super) item_key: String,
317    pub(super) item_type: String,
318    pub(super) attempt: i64,
319    pub(super) error_class: Option<String>,
320    pub(super) error: Option<String>,
321    /// GAP-SG-72: `choices[0].finish_reason` from the OpenRouter response
322    /// that produced this failure, when one was decoded (e.g. `"length"`
323    /// for a max_tokens truncation). `None` for subprocess-provider modes
324    /// or failures that never reached a decoded response.
325    pub(super) finish_reason: Option<String>,
326    /// GAP-SG-72: `usage.prompt_tokens` from the same response, when known.
327    pub(super) input_tokens: Option<i64>,
328    /// GAP-SG-72: `usage.completion_tokens` from the same response, when known.
329    pub(super) output_tokens: Option<i64>,
330}
331
332/// GAP-SG-23/11: summary footer for `--list-dead` and `--requeue-dead`.
333#[derive(Debug, Serialize, schemars::JsonSchema)]
334pub struct DeadSummary {
335    pub(super) summary: bool,
336    pub(super) operation: String,
337    pub(super) namespace: String,
338    /// `list-dead` | `requeue-dead` | `prune-dead-orphans`
339    pub(super) action: &'static str,
340    pub(super) dead_total: i64,
341    pub(super) requeued: i64,
342    /// GAP-SG-66: `prune-dead-orphans` — dead rows removed because their
343    /// referenced memory no longer exists in the main DB for the namespace.
344    /// Zero for `list-dead` / `requeue-dead`.
345    pub(super) pruned: i64,
346}
347
348/// Classifies an enrich item failure into a retry/dead-letter outcome.
349///
350/// This is the FALLBACK classifier: it is only consulted when the failure
351/// did not already carry a typed [`crate::retry::AttemptOutcome`] computed at
352/// its origin (see [`record_item_failure_typed`], fed by
353/// [`crate::commands::enrich::extraction::take_last_openrouter_failure`] for
354/// OpenRouter chat/embedding calls). Classification is TYPED by `AppError`
355/// variant only — NEVER by matching the formatted message — per
356/// `rules_rust_retry_com_backoff.md` ("NUNCA usar string matching em
357/// mensagens de erro").
358pub(super) fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
359    use crate::retry::AttemptOutcome;
360    match e {
361        AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
362            AttemptOutcome::Transient
363        }
364        // GAP-SG-78: a referenced entity that is not yet materialized is a
365        // TRANSITORY absence — a later enrich pass creates the entity — so the
366        // item is rescheduled, not dead-lettered on the first miss. Matched on
367        // the typed variant, never a message substring (rules_rust_retry: NUNCA
368        // string matching). The `--max-attempts` floor (default 8) still ends
369        // the item if the entity never materializes, mirroring the `Embedding`
370        // floor below.
371        AppError::EntityNotYetMaterialized { .. } => AttemptOutcome::Transient,
372        // GAP-SG-09: errors that are genuinely PERMANENT for this item and must
373        // dead-letter immediately (retrying cannot help): a structured provider
374        // rejection (context-length overflow / refusal carried as ProviderError),
375        // or a MEMORY that no longer exists (deleted or renamed between scan and
376        // processing). Entity absence is handled above as transitory, NOT here.
377        AppError::ProviderError { .. }
378        | AppError::NotFound(_)
379        | AppError::MemoryNotFound { .. }
380        | AppError::MemoryNotFoundById { .. } => AttemptOutcome::HardFailure,
381        // GAP-SG-76: SQLITE_BUSY/LOCKED is a lock-contention hiccup between the
382        // queue writer and a concurrent claim — retry it; any other database
383        // error (constraint violation, corruption, I/O) is permanent.
384        AppError::Database(_) => {
385            if crate::storage::utils::is_sqlite_busy(e) {
386                AttemptOutcome::Transient
387            } else {
388                AttemptOutcome::HardFailure
389            }
390        }
391        // GAP-SG-73: safe floor for the `re-embed` operation. `AppError::Embedding`
392        // reaches here only via `embed_with_fallback`'s backend-chain resolution
393        // (`crate::embedder`), which discards the origin-typed
394        // `EmbedError::retry_class` through `From<EmbedError> for AppError` before
395        // the error surfaces to the queue. Extracting the precise verdict would
396        // require bypassing the fallback chain to call the OpenRouter embedding
397        // client directly — out of scope here (touches `embedder.rs`, which is
398        // off-limits, and removes the multi-backend fallback safety net).
399        // Transient is the conservative choice: a persistently permanent failure
400        // still terminates via `--max-attempts` instead of retrying forever.
401        AppError::Embedding(_) => AttemptOutcome::Transient,
402        // Every other variant — including `Validation` without an
403        // origin-typed retry verdict attached — is treated as permanent.
404        // Previously this branch inspected the formatted message for
405        // substrings like "json" / "missing '" to guess at transience; that
406        // guesswork is now unnecessary because the OpenRouter chat path
407        // (the project's only supported enrich mode) attaches its retry
408        // verdict directly via `ChatError::retry_class`, computed at the
409        // exact HTTP status / provider code in `chat_api.rs`, and
410        // `record_item_failure_typed` consumes it BEFORE ever falling back
411        // to this classifier.
412        _ => AttemptOutcome::HardFailure,
413    }
414}
415
416/// Applies a failure outcome to a single queue row. Shared by the parallel
417/// worker and the serial loop (DRY). A `HardFailure`, or a transient failure
418/// whose attempt count reached `max_attempts`, lands in the dead-letter status
419/// (`status='dead'`) so it is never re-selected. A transient failure below the
420/// cap is rescheduled to `pending` with an exponential-backoff `next_retry_at`.
421/// Returns the [`crate::retry::AttemptOutcome`] so the caller can feed the
422/// existing circuit breaker.
423///
424/// GAP-SG-73: delegates to [`record_item_failure_typed`] with the outcome
425/// computed by the untyped fallback classifier and no diagnostics — the
426/// entry point for callers that only have a bare `&AppError` (subprocess
427/// providers, persistence failures).
428pub(super) fn record_item_failure(
429    queue_conn: &rusqlite::Connection,
430    queue_id: i64,
431    attempt: i64,
432    max_attempts: u32,
433    err: &AppError,
434) -> crate::retry::AttemptOutcome {
435    let outcome = classify_enrich_outcome(err);
436    let err_str = format!("{err}");
437    record_item_failure_typed(
438        queue_conn,
439        queue_id,
440        attempt,
441        max_attempts,
442        outcome,
443        &err_str,
444        None,
445        None,
446        None,
447    )
448}
449
450/// GAP-SG-72/73: applies a failure outcome to a single queue row using an
451/// [`crate::retry::AttemptOutcome`] the caller ALREADY computed at the
452/// failure's origin (e.g. `ChatError::retry_class` from an OpenRouter chat
453/// call), plus whatever truncation diagnostics (`finish_reason` and token
454/// counts) were available. This is the precise counterpart to
455/// [`record_item_failure`], which falls back to the untyped
456/// [`classify_enrich_outcome`] classifier when no origin-typed verdict
457/// exists. Both share this single write path (DRY).
458#[allow(clippy::too_many_arguments)]
459pub(super) fn record_item_failure_typed(
460    queue_conn: &rusqlite::Connection,
461    queue_id: i64,
462    attempt: i64,
463    max_attempts: u32,
464    outcome: crate::retry::AttemptOutcome,
465    err_str: &str,
466    finish_reason: Option<&str>,
467    input_tokens: Option<i64>,
468    output_tokens: Option<i64>,
469) -> crate::retry::AttemptOutcome {
470    use crate::retry::AttemptOutcome;
471    let error_class = match outcome {
472        AttemptOutcome::Transient => "transient",
473        AttemptOutcome::HardFailure => "permanent",
474        AttemptOutcome::Success => "success",
475    };
476
477    let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
478    if terminal {
479        let _ = queue_conn.execute(
480            "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now'), \
481             finish_reason=?3, input_tokens=?4, output_tokens=?5 WHERE id=?6",
482            rusqlite::params![
483                err_str,
484                error_class,
485                finish_reason,
486                input_tokens,
487                output_tokens,
488                queue_id
489            ],
490        );
491    } else {
492        let delay = crate::retry::compute_delay(
493            &crate::retry::RetryConfig::llm_rate_limit(),
494            attempt.max(0) as u32,
495        );
496        let secs = delay.as_secs().max(1);
497        let modifier = format!("+{secs} seconds");
498        let _ = queue_conn.execute(
499            "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3), \
500             finish_reason=?4, input_tokens=?5, output_tokens=?6 WHERE id=?7",
501            rusqlite::params![
502                err_str,
503                error_class,
504                modifier,
505                finish_reason,
506                input_tokens,
507                output_tokens,
508                queue_id
509            ],
510        );
511    }
512    outcome
513}
514
515/// GAP-SG-76: outcome of claiming the next pending queue row. Distinguishes
516/// a genuinely empty backlog (`QueryReturnedNoRows`) from lock contention
517/// (`SQLITE_BUSY`/`SQLITE_LOCKED`) so the caller retries briefly on the
518/// latter instead of breaking out of the drain loop early. Both the serial
519/// loop and the parallel worker loop share this (DRY) — previously each
520/// collapsed every `query_row` error into `.ok()`, silently treating a busy
521/// database the same as an empty queue.
522pub(super) enum DequeueOutcome {
523    Claimed((i64, String, String, i64)),
524    Empty,
525}
526
527pub(super) fn dequeue_next_pending(
528    queue_conn: &rusqlite::Connection,
529    backoff_clause: &str,
530) -> Result<DequeueOutcome, AppError> {
531    let dequeue_sql = format!(
532        "UPDATE queue SET status='processing', attempt=attempt+1 \
533         WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
534                     ORDER BY id LIMIT 1) \
535         RETURNING id, item_key, item_type, attempt"
536    );
537    match queue_conn.query_row(&dequeue_sql, [], |row| {
538        Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
539    }) {
540        Ok(claimed) => Ok(DequeueOutcome::Claimed(claimed)),
541        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(DequeueOutcome::Empty),
542        Err(e) => Err(AppError::Database(e)),
543    }
544}
545
546// ---------------------------------------------------------------------------
547// Tests
548// ---------------------------------------------------------------------------
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553
554    fn open_test_db() -> Connection {
555        let conn = Connection::open_in_memory().expect("in-memory db");
556        conn.execute_batch(
557            "CREATE TABLE memories (
558                id          INTEGER PRIMARY KEY AUTOINCREMENT,
559                namespace   TEXT NOT NULL DEFAULT 'global',
560                name        TEXT NOT NULL,
561                type        TEXT NOT NULL DEFAULT 'note',
562                description TEXT NOT NULL DEFAULT '',
563                body        TEXT NOT NULL DEFAULT '',
564                body_hash   TEXT NOT NULL DEFAULT '',
565                session_id  TEXT,
566                source      TEXT NOT NULL DEFAULT 'agent',
567                metadata    TEXT NOT NULL DEFAULT '{}',
568                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
569                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
570                deleted_at  INTEGER,
571                UNIQUE(namespace, name)
572            );",
573        )
574        .expect("schema creation must succeed");
575        conn
576    }
577
578    fn open_temp_queue() -> (Connection, String) {
579        let path = format!(
580            "/tmp/test-enrich-dl-{}-{}.sqlite",
581            std::process::id(),
582            fastrand::u64(..)
583        );
584        let conn = open_queue_db(&path).expect("queue db must open");
585        (conn, path)
586    }
587
588    fn insert_pending(conn: &Connection, key: &str) -> i64 {
589        conn.execute(
590            "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
591            rusqlite::params![key],
592        )
593        .unwrap();
594        conn.last_insert_rowid()
595    }
596
597    #[test]
598    fn queue_db_schema_creates_correctly() {
599        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
600        let conn = open_queue_db(&tmp_path).expect("queue db must open");
601        let count: i64 = conn
602            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
603            .unwrap();
604        assert_eq!(count, 0);
605        let _ = std::fs::remove_file(&tmp_path);
606    }
607
608    #[test]
609    fn classify_rate_limit_is_transient() {
610        let e = AppError::RateLimited {
611            detail: "429".into(),
612        };
613        assert_eq!(
614            classify_enrich_outcome(&e),
615            crate::retry::AttemptOutcome::Transient
616        );
617    }
618
619    #[test]
620    fn classify_timeout_and_dbbusy_are_transient() {
621        let t = AppError::Timeout {
622            operation: "judge".into(),
623            duration_secs: 30,
624        };
625        let b = AppError::DbBusy("locked".into());
626        assert_eq!(
627            classify_enrich_outcome(&t),
628            crate::retry::AttemptOutcome::Transient
629        );
630        assert_eq!(
631            classify_enrich_outcome(&b),
632            crate::retry::AttemptOutcome::Transient
633        );
634    }
635
636    #[test]
637    fn classify_validation_and_parse_are_hard_failure() {
638        let v = AppError::Validation("failed to parse entities array: bad".into());
639        assert_eq!(
640            classify_enrich_outcome(&v),
641            crate::retry::AttemptOutcome::HardFailure
642        );
643    }
644
645    #[test]
646    fn open_queue_db_alter_is_idempotent() {
647        let path = format!(
648            "/tmp/test-enrich-idem-{}-{}.sqlite",
649            std::process::id(),
650            fastrand::u64(..)
651        );
652        let _ = open_queue_db(&path).expect("first open");
653        let conn = open_queue_db(&path).expect("second open is idempotent");
654        let cols: Vec<String> = {
655            let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
656            stmt.query_map([], |r| r.get::<_, String>(1))
657                .unwrap()
658                .collect::<Result<Vec<_>, _>>()
659                .unwrap()
660        };
661        assert!(cols.iter().any(|c| c == "error_class"));
662        assert!(cols.iter().any(|c| c == "next_retry_at"));
663        let _ = std::fs::remove_file(&path);
664    }
665
666    #[test]
667    fn record_item_failure_hard_marks_dead() {
668        let (conn, path) = open_temp_queue();
669        let id = insert_pending(&conn, "mem-hard");
670        let outcome = record_item_failure(
671            &conn,
672            id,
673            1,
674            5,
675            &AppError::Validation("invalid body".into()),
676        );
677        assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
678        let status: String = conn
679            .query_row(
680                "SELECT status FROM queue WHERE id=?1",
681                rusqlite::params![id],
682                |r| r.get(0),
683            )
684            .unwrap();
685        assert_eq!(status, "dead");
686        let _ = std::fs::remove_file(&path);
687    }
688
689    #[test]
690    fn record_item_failure_transient_reschedules_pending() {
691        let (conn, path) = open_temp_queue();
692        let id = insert_pending(&conn, "mem-transient");
693        let outcome = record_item_failure(
694            &conn,
695            id,
696            1,
697            5,
698            &AppError::RateLimited {
699                detail: "429".into(),
700            },
701        );
702        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
703        let (status, future): (String, i64) = conn
704            .query_row(
705                "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
706                rusqlite::params![id],
707                |r| Ok((r.get(0)?, r.get(1)?)),
708            )
709            .unwrap();
710        assert_eq!(status, "pending");
711        assert_eq!(future, 1, "next_retry_at must be in the future");
712        let _ = std::fs::remove_file(&path);
713    }
714
715    #[test]
716    fn record_item_failure_transient_at_cap_marks_dead() {
717        let (conn, path) = open_temp_queue();
718        let id = insert_pending(&conn, "mem-cap");
719        let outcome = record_item_failure(
720            &conn,
721            id,
722            5,
723            5,
724            &AppError::RateLimited {
725                detail: "429".into(),
726            },
727        );
728        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
729        let status: String = conn
730            .query_row(
731                "SELECT status FROM queue WHERE id=?1",
732                rusqlite::params![id],
733                |r| r.get(0),
734            )
735            .unwrap();
736        assert_eq!(status, "dead");
737        let _ = std::fs::remove_file(&path);
738    }
739
740    #[test]
741    fn dequeue_skips_future_retry_and_dead() {
742        let (conn, path) = open_temp_queue();
743        let eligible = insert_pending(&conn, "mem-eligible");
744        let waiting = insert_pending(&conn, "mem-waiting");
745        conn.execute(
746            "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
747            rusqlite::params![waiting],
748        )
749        .unwrap();
750        let dead = insert_pending(&conn, "mem-dead");
751        conn.execute(
752            "UPDATE queue SET status='dead' WHERE id=?1",
753            rusqlite::params![dead],
754        )
755        .unwrap();
756
757        let claimed: Option<i64> = conn
758            .query_row(
759                "UPDATE queue SET status='processing', attempt=attempt+1 \
760                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
761                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
762                             ORDER BY id LIMIT 1) \
763                 RETURNING id",
764                [],
765                |r| r.get(0),
766            )
767            .ok();
768        assert_eq!(claimed, Some(eligible));
769
770        let second: Option<i64> = conn
771            .query_row(
772                "UPDATE queue SET status='processing', attempt=attempt+1 \
773                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
774                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
775                             ORDER BY id LIMIT 1) \
776                 RETURNING id",
777                [],
778                |r| r.get(0),
779            )
780            .ok();
781        assert_eq!(second, None);
782        let _ = std::fs::remove_file(&path);
783    }
784
785    #[test]
786    fn classify_validation_never_infers_transience_from_message() {
787        // GAP-SG-73: the fallback classifier is TYPED-only now. Messages
788        // that used to be sniffed for "json" / "missing '" substrings and
789        // treated as Transient are HardFailure here — the OpenRouter chat
790        // path (the project's only supported enrich mode) attaches its own
791        // typed `ChatError::retry_class` for these exact shape failures
792        // BEFORE `record_item_failure_typed` ever falls back to this
793        // classifier, so no message-based guessing survives in the fallback.
794        for msg in [
795            "model 'x' returned non-object JSON after repair (got string)",
796            "model 'x' returned content that could not be parsed even after JSON repair",
797            "model 'x' returned no structured content",
798            "LLM result missing 'description' field",
799            "LLM result missing 'enriched_body' field",
800        ] {
801            assert_eq!(
802                classify_enrich_outcome(&AppError::Validation(msg.into())),
803                crate::retry::AttemptOutcome::HardFailure,
804                "expected hard failure for: {msg}"
805            );
806        }
807    }
808
809    #[test]
810    fn classify_embedding_error_is_transient_floor() {
811        assert_eq!(
812            classify_enrich_outcome(&AppError::Embedding("dimension mismatch".into())),
813            crate::retry::AttemptOutcome::Transient
814        );
815    }
816
817    // GAP-SG-78: entity absence is Transient (own typed variant); memory
818    // absence and the untyped NotFound string stay HardFailure. No substring.
819    #[test]
820    fn classify_entity_not_yet_materialized_is_transient() {
821        assert_eq!(
822            classify_enrich_outcome(&AppError::EntityNotYetMaterialized {
823                name: "acme".into(),
824                namespace: "global".into(),
825            }),
826            crate::retry::AttemptOutcome::Transient
827        );
828    }
829
830    #[test]
831    fn classify_memory_absence_stays_hard_failure() {
832        assert_eq!(
833            classify_enrich_outcome(&AppError::MemoryNotFound {
834                name: "mem-x".into(),
835                namespace: "global".into(),
836            }),
837            crate::retry::AttemptOutcome::HardFailure
838        );
839        assert_eq!(
840            classify_enrich_outcome(&AppError::MemoryNotFoundById { id: 42 }),
841            crate::retry::AttemptOutcome::HardFailure
842        );
843        assert_eq!(
844            classify_enrich_outcome(&AppError::NotFound("gone".into())),
845            crate::retry::AttemptOutcome::HardFailure
846        );
847    }
848
849    #[test]
850    fn classify_database_busy_is_transient_non_busy_is_hard() {
851        let busy = AppError::Database(rusqlite::Error::SqliteFailure(
852            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
853            Some("database is locked".into()),
854        ));
855        assert_eq!(
856            classify_enrich_outcome(&busy),
857            crate::retry::AttemptOutcome::Transient
858        );
859        let constraint = AppError::Database(rusqlite::Error::SqliteFailure(
860            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
861            Some("UNIQUE constraint failed".into()),
862        ));
863        assert_eq!(
864            classify_enrich_outcome(&constraint),
865            crate::retry::AttemptOutcome::HardFailure
866        );
867    }
868
869    #[test]
870    fn record_item_failure_typed_persists_diagnostics_on_dead_letter() {
871        let (conn, path) = open_temp_queue();
872        let id = insert_pending(&conn, "mem-diag");
873        let outcome = record_item_failure_typed(
874            &conn,
875            id,
876            1,
877            5,
878            crate::retry::AttemptOutcome::HardFailure,
879            "truncated response",
880            Some("length"),
881            Some(120),
882            Some(4096),
883        );
884        assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
885        let (status, finish_reason, input_tokens, output_tokens): (
886            String,
887            Option<String>,
888            Option<i64>,
889            Option<i64>,
890        ) = conn
891            .query_row(
892                "SELECT status, finish_reason, input_tokens, output_tokens FROM queue WHERE id=?1",
893                rusqlite::params![id],
894                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
895            )
896            .unwrap();
897        assert_eq!(status, "dead");
898        assert_eq!(finish_reason.as_deref(), Some("length"));
899        assert_eq!(input_tokens, Some(120));
900        assert_eq!(output_tokens, Some(4096));
901        let _ = std::fs::remove_file(&path);
902    }
903
904    #[test]
905    fn record_item_failure_typed_reschedules_transient_below_max_attempts() {
906        // GAP-SG-72-chat: a transient failure (e.g. a truncated OpenRouter
907        // response) below max_attempts must stay `pending` with a
908        // future `next_retry_at`, not go straight to `dead` — and it must
909        // still persist the finish_reason/token diagnostics for later
910        // inspection via `--list-dead` / `--status`.
911        let (conn, path) = open_temp_queue();
912        let id = insert_pending(&conn, "mem-retry");
913        let outcome = record_item_failure_typed(
914            &conn,
915            id,
916            1,
917            5,
918            crate::retry::AttemptOutcome::Transient,
919            "truncated response",
920            Some("length"),
921            Some(120),
922            Some(64),
923        );
924        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
925        let (status, error_class, finish_reason, next_retry_at): (
926            String,
927            String,
928            Option<String>,
929            Option<String>,
930        ) = conn
931            .query_row(
932                "SELECT status, error_class, finish_reason, next_retry_at FROM queue WHERE id=?1",
933                rusqlite::params![id],
934                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
935            )
936            .unwrap();
937        assert_eq!(status, "pending");
938        assert_eq!(error_class, "transient");
939        assert_eq!(finish_reason.as_deref(), Some("length"));
940        assert!(
941            next_retry_at.is_some(),
942            "a rescheduled item must carry a next_retry_at"
943        );
944        let _ = std::fs::remove_file(&path);
945    }
946
947    /// GAP-SG-76/v1.1.00 fix: proves the enrich drain loops' composition
948    /// `with_busy_retry(|| dequeue_next_pending(...))` is BOUNDED under
949    /// sustained lock contention instead of the previous
950    /// `loop { ... continue; }`, which retried `SQLITE_BUSY` forever. A
951    /// second connection holds an exclusive write lock for the whole test;
952    /// the queue connection under test has `busy_timeout=0` so SQLite
953    /// reports `SQLITE_BUSY` immediately instead of blocking internally,
954    /// isolating `with_busy_retry`'s own bounded backoff (5 attempts) as the
955    /// only source of delay.
956    #[test]
957    fn with_busy_retry_bounds_dequeue_under_sustained_contention() {
958        let (conn, path) = open_temp_queue();
959        insert_pending(&conn, "mem-busy");
960        conn.pragma_update(None, "busy_timeout", 0i64)
961            .expect("busy_timeout override must succeed");
962
963        // Second connection holds an EXCLUSIVE write lock so every dequeue
964        // attempt on `conn` observes SQLITE_BUSY, never SQLITE_LOCKED-then-
965        // clears-up.
966        let blocker = Connection::open(&path).expect("blocker connection must open");
967        blocker
968            .execute_batch("BEGIN EXCLUSIVE;")
969            .expect("exclusive lock must be acquired");
970
971        let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
972        let calls_clone = std::sync::Arc::clone(&calls);
973        let result: Result<DequeueOutcome, AppError> =
974            crate::storage::utils::with_busy_retry(|| {
975                calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
976                dequeue_next_pending(&conn, "")
977            });
978
979        assert!(
980            matches!(result, Err(AppError::DbBusy(_))),
981            "sustained SQLITE_BUSY must convert to DbBusy, not hang or silently report Empty"
982        );
983        assert_eq!(
984            calls.load(std::sync::atomic::Ordering::SeqCst),
985            crate::constants::MAX_SQLITE_BUSY_RETRIES,
986            "must attempt exactly MAX_SQLITE_BUSY_RETRIES times, never retry unbounded"
987        );
988
989        blocker
990            .execute_batch("ROLLBACK;")
991            .expect("releasing the exclusive lock must succeed");
992        let _ = std::fs::remove_file(&path);
993    }
994
995    #[test]
996    fn dequeue_next_pending_distinguishes_empty_from_claimed() {
997        let (conn, path) = open_temp_queue();
998        let id = insert_pending(&conn, "mem-dequeue");
999        let claimed = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
1000        match claimed {
1001            DequeueOutcome::Claimed((claimed_id, key, _, _)) => {
1002                assert_eq!(claimed_id, id);
1003                assert_eq!(key, "mem-dequeue");
1004            }
1005            DequeueOutcome::Empty => panic!("expected a claimed row"),
1006        }
1007        let empty = dequeue_next_pending(&conn, "").expect("dequeue must succeed");
1008        assert!(matches!(empty, DequeueOutcome::Empty));
1009        let _ = std::fs::remove_file(&path);
1010    }
1011
1012    #[test]
1013    fn classify_provider_error_and_not_found_are_hard() {
1014        assert_eq!(
1015            classify_enrich_outcome(&AppError::ProviderError {
1016                code: "400".into(),
1017                message: "context length exceeded".into(),
1018            }),
1019            crate::retry::AttemptOutcome::HardFailure
1020        );
1021        assert_eq!(
1022            classify_enrich_outcome(&AppError::NotFound("memory 'gone' not found".into())),
1023            crate::retry::AttemptOutcome::HardFailure
1024        );
1025    }
1026
1027    #[test]
1028    fn open_queue_db_migrates_operation_column() {
1029        let (conn, path) = open_temp_queue();
1030        drop(conn);
1031        let conn = open_queue_db(&path).expect("second open is idempotent");
1032        let cols: Vec<String> = {
1033            let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
1034            stmt.query_map([], |r| r.get::<_, String>(1))
1035                .unwrap()
1036                .collect::<Result<Vec<_>, _>>()
1037                .unwrap()
1038        };
1039        assert!(cols.iter().any(|c| c == "operation"));
1040        assert!(cols.iter().any(|c| c == "memory_id"));
1041        let _ = std::fs::remove_file(&path);
1042    }
1043
1044    #[test]
1045    fn enqueue_candidate_tags_operation_and_memory_id() {
1046        let main = open_test_db();
1047        main.execute(
1048            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-x', 'body')",
1049            [],
1050        )
1051        .unwrap();
1052        let mem_id: i64 = main
1053            .query_row("SELECT id FROM memories WHERE name='mem-x'", [], |r| {
1054                r.get(0)
1055            })
1056            .unwrap();
1057        let (queue, path) = open_temp_queue();
1058        enqueue_candidate(&queue, &main, "global", "mem-x", "memory", "MemoryBindings");
1059        let (op, mid): (String, i64) = queue
1060            .query_row(
1061                "SELECT operation, memory_id FROM queue WHERE item_key='mem-x'",
1062                [],
1063                |r| Ok((r.get(0)?, r.get(1)?)),
1064            )
1065            .unwrap();
1066        assert_eq!(op, "MemoryBindings");
1067        assert_eq!(mid, mem_id);
1068        let _ = std::fs::remove_file(&path);
1069    }
1070
1071    #[test]
1072    fn requeue_dead_resurrects_dead_rows() {
1073        let (conn, path) = open_temp_queue();
1074        conn.execute(
1075            "INSERT INTO queue (item_key, item_type, status, operation, attempt, error, error_class, next_retry_at) \
1076             VALUES ('mem-dead', 'memory', 'dead', 'MemoryBindings', 8, 'boom', 'permanent', datetime('now'))",
1077            [],
1078        )
1079        .unwrap();
1080        let n = conn
1081            .execute(
1082                "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1083                 error=NULL, error_class=NULL \
1084                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1085                rusqlite::params!["MemoryBindings"],
1086            )
1087            .unwrap();
1088        assert_eq!(n, 1);
1089        let (status, attempt, nra): (String, i64, Option<String>) = conn
1090            .query_row(
1091                "SELECT status, attempt, next_retry_at FROM queue WHERE item_key='mem-dead'",
1092                [],
1093                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1094            )
1095            .unwrap();
1096        assert_eq!(status, "pending");
1097        assert_eq!(attempt, 0);
1098        assert!(nra.is_none());
1099        let _ = std::fs::remove_file(&path);
1100    }
1101
1102    #[test]
1103    fn skipped_item_keys_excludes_only_skipped_for_operation() {
1104        // GAP-SG-69: the body-enrich scan must drop memories already vetoed
1105        // `status='skipped'` so `--until-empty` converges instead of re-scanning a
1106        // non-expandable short body forever (the detached worker reported a
1107        // stuck backlog for 30+ min).
1108        let (conn, path) = open_temp_queue();
1109        conn.execute(
1110            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-vetoed', 'memory', 'skipped', 'BodyEnrich')",
1111            [],
1112        )
1113        .unwrap();
1114        conn.execute(
1115            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-pending', 'memory', 'pending', 'BodyEnrich')",
1116            [],
1117        )
1118        .unwrap();
1119        conn.execute(
1120            "INSERT INTO queue (item_key, item_type, status, operation) VALUES ('mem-other-op', 'memory', 'skipped', 'MemoryBindings')",
1121            [],
1122        )
1123        .unwrap();
1124        let keys = skipped_item_keys(&conn, "BodyEnrich").unwrap();
1125        assert!(
1126            keys.contains("mem-vetoed"),
1127            "vetoed BodyEnrich item must be excluded from scan"
1128        );
1129        assert!(
1130            !keys.contains("mem-pending"),
1131            "pending item is still actionable"
1132        );
1133        assert!(
1134            !keys.contains("mem-other-op"),
1135            "skipped item from another operation must not leak"
1136        );
1137        assert_eq!(keys.len(), 1);
1138        let _ = std::fs::remove_file(&path);
1139    }
1140
1141    #[test]
1142    fn cascade_cleanup_delete_targets_memory_id_and_name() {
1143        let (conn, path) = open_temp_queue();
1144        conn.execute(
1145            "INSERT INTO queue (item_key, item_type, status, memory_id) VALUES ('by-id', 'memory', 'done', 42)",
1146            [],
1147        )
1148        .unwrap();
1149        conn.execute(
1150            "INSERT INTO queue (item_key, item_type, status) VALUES ('by-name', 'memory', 'pending')",
1151            [],
1152        )
1153        .unwrap();
1154        let removed = conn
1155            .execute(
1156                "DELETE FROM queue WHERE memory_id = ?1 OR item_key = ?2",
1157                rusqlite::params![42_i64, "by-name"],
1158            )
1159            .unwrap();
1160        assert_eq!(removed, 2);
1161        let remaining: i64 = conn
1162            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
1163            .unwrap();
1164        assert_eq!(remaining, 0);
1165        let _ = std::fs::remove_file(&path);
1166    }
1167
1168    #[test]
1169    fn item_type_for_maps_entity_and_memory() {
1170        assert_eq!(
1171            item_type_for(&EnrichOperation::EntityDescriptions),
1172            "entity"
1173        );
1174        assert_eq!(item_type_for(&EnrichOperation::MemoryBindings), "memory");
1175        assert_eq!(item_type_for(&EnrichOperation::AugmentBindings), "memory");
1176        assert_eq!(item_type_for(&EnrichOperation::BodyExtract), "memory");
1177    }
1178
1179    // v1.1.1 (P2): prefixed re-embed keys override the operation default so
1180    // prune_dead_orphans never reaps entity/chunk rows as orphaned memories.
1181    #[test]
1182    fn item_type_for_key_honours_reembed_prefixes() {
1183        assert_eq!(item_type_for_key("plain-memory-name", "memory"), "memory");
1184        assert_eq!(
1185            item_type_for_key("entity:tokio-runtime", "memory"),
1186            "entity"
1187        );
1188        assert_eq!(item_type_for_key("chunk:42", "memory"), "chunk");
1189        assert_eq!(item_type_for_key("some-entity", "entity"), "entity");
1190    }
1191
1192    #[test]
1193    fn prune_dead_orphans_removes_only_orphan_memory_rows() {
1194        let main = open_test_db();
1195        // One live memory whose dead row must be KEPT (it still exists).
1196        main.execute(
1197            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'alive', 'b')",
1198            [],
1199        )
1200        .unwrap();
1201        let (queue, path) = open_temp_queue();
1202        // Orphan dead memory row (no matching memory) -> pruned.
1203        queue
1204            .execute(
1205                "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1206                 VALUES ('gone', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1207                [],
1208            )
1209            .unwrap();
1210        // Live dead memory row (memory exists) -> kept.
1211        queue
1212            .execute(
1213                "INSERT INTO queue (item_key, item_type, status, operation, error_class) \
1214                 VALUES ('alive', 'memory', 'dead', 'MemoryBindings', 'permanent')",
1215                [],
1216            )
1217            .unwrap();
1218        // Entity dead row -> never touched (key is not a memory name).
1219        queue
1220            .execute(
1221                "INSERT INTO queue (item_key, item_type, status, operation) \
1222                 VALUES ('some-entity', 'entity', 'dead', 'EntityDescriptions')",
1223                [],
1224            )
1225            .unwrap();
1226
1227        let pruned = prune_dead_orphans(&queue, &main, "MemoryBindings", "global").unwrap();
1228        assert_eq!(pruned, 1, "only the orphan memory row is pruned");
1229
1230        let remaining: Vec<String> = {
1231            let mut stmt = queue
1232                .prepare("SELECT item_key FROM queue ORDER BY item_key")
1233                .unwrap();
1234            stmt.query_map([], |r| r.get::<_, String>(0))
1235                .unwrap()
1236                .collect::<Result<Vec<_>, _>>()
1237                .unwrap()
1238        };
1239        assert_eq!(remaining, vec!["alive", "some-entity"]);
1240        let _ = std::fs::remove_file(&path);
1241    }
1242}