Skip to main content

sqlite_graphrag/commands/enrich/
mod.rs

1// v1.0.97: modularised into queue.rs, scan.rs, postprocess.rs, extraction.rs.
2// See ADR-0056 (closes the ADR-0046 "Known Tech Debt (v1.0.89+)" item).
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB derived next to `--db` (GAP-SG-64) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY note
19//!
20//! v1.0.97: `claude_runner.rs` now hosts the shared Claude invocation helpers
21//! (`run_claude`, `parse_claude_output`, `spawn_with_memory_limit`). The queue
22//! DB schema in `ingest_claude.rs` still duplicates `open_queue_db` here — a
23//! future pass can unify them.
24
25mod extraction;
26mod postprocess;
27mod queue;
28mod scan;
29use extraction::{
30    call_body_enrich, call_body_extract, call_deep_research_synth, call_description_enrich,
31    call_domain_classify, call_entity_connect, call_entity_description, call_entity_type_validate,
32    call_graph_audit, call_memory_bindings, call_reembed, call_relation_reclassify,
33    call_weight_calibrate, find_codex_binary, take_last_openrouter_failure, EnrichItemResult,
34};
35use postprocess::{
36    persist_enriched_body, persist_entity_description, persist_memory_bindings,
37    reembed_memory_vector, take_enrich_backend,
38};
39pub use queue::{cleanup_queue_entry, DeadItem, DeadSummary, EnrichStatus, WaitingItem};
40use queue::{
41    dequeue_next_pending, enqueue_candidate, item_type_for, open_queue_db, prune_dead_orphans,
42    record_item_failure, record_item_failure_typed, skipped_item_keys, DequeueOutcome,
43};
44use scan::{
45    count_operation_backlog, scan_isolated_entity_pairs, scan_operation, scan_unbound_memories,
46};
47
48use crate::commands::ingest_claude::find_claude_binary;
49use crate::constants::MAX_MEMORY_BODY_LEN;
50use crate::entity_type::EntityType;
51use crate::errors::AppError;
52use crate::paths::AppPaths;
53use crate::storage::connection::{ensure_db_ready, open_rw};
54use crate::storage::entities::{self, NewEntity, NewRelationship};
55use crate::storage::memories;
56
57use rusqlite::Connection;
58use serde::{Deserialize, Serialize};
59use std::io::Write;
60use std::path::{Path, PathBuf};
61use std::time::Instant;
62
63// ---------------------------------------------------------------------------
64// Constants
65// ---------------------------------------------------------------------------
66
67const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
68const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
69const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
70
71// ---------------------------------------------------------------------------
72// JSON schema used for memory-bindings and body-enrich extraction
73// ---------------------------------------------------------------------------
74
75const BINDINGS_SCHEMA: &str = r#"{
76  "type": "object",
77  "properties": {
78    "entities": {
79      "type": "array",
80      "items": {
81        "type": "object",
82        "properties": {
83          "name": { "type": "string" },
84          "entity_type": {
85            "type": "string",
86            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
87          }
88        },
89        "required": ["name", "entity_type"],
90        "additionalProperties": false
91      }
92    },
93    "relationships": {
94      "type": "array",
95      "items": {
96        "type": "object",
97        "properties": {
98          "source": { "type": "string" },
99          "target": { "type": "string" },
100          "relation": {
101            "type": "string",
102            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
103          },
104          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
105        },
106        "required": ["source","target","relation","strength"],
107        "additionalProperties": false
108      }
109    }
110  },
111  "required": ["entities","relationships"],
112  "additionalProperties": false
113}"#;
114
115const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
116  "type": "object",
117  "properties": {
118    "description": { "type": "string" }
119  },
120  "required": ["description"],
121  "additionalProperties": false
122}"#;
123
124const BODY_ENRICH_SCHEMA: &str = r#"{
125  "type": "object",
126  "properties": {
127    "enriched_body": { "type": "string" }
128  },
129  "required": ["enriched_body"],
130  "additionalProperties": false
131}"#;
132
133// G27 P1: weight-calibrate
134const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
135Scale:\n\
136- 0.9 = vital hard dependency (A cannot function without B)\n\
137- 0.7 = important design relationship (A strongly supports/enables B)\n\
138- 0.5 = useful contextual link (A and B share relevant context)\n\
139- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
140Respond with the calibrated weight and brief reasoning.";
141
142const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
143  "type": "object",
144  "properties": {
145    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
146    "reasoning": { "type": "string" }
147  },
148  "required": ["calibrated_weight", "reasoning"],
149  "additionalProperties": false
150}"#;
151
152// G27 P1: relation-reclassify
153const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
154Valid canonical relations (pick exactly one):\n\
155- depends-on: A cannot function without B\n\
156- uses: A utilizes B but could substitute it\n\
157- supports: A reinforces or enables B\n\
158- causes: A triggers or produces B\n\
159- fixes: A resolves a problem in B\n\
160- contradicts: A conflicts with or invalidates B\n\
161- applies-to: A is relevant to or scoped within B\n\
162- follows: A comes after B in sequence\n\
163- replaces: A substitutes B\n\
164- tracked-in: A is monitored in B\n\
165- related: A and B share context (use sparingly)\n\n\
166Respond with the correct relation, strength, and reasoning.";
167
168const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
169  "type": "object",
170  "properties": {
171    "relation": { "type": "string" },
172    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
173    "reasoning": { "type": "string" }
174  },
175  "required": ["relation", "strength", "reasoning"],
176  "additionalProperties": false
177}"#;
178
179// G27 P2: entity-connect — suggest relationships between isolated entities
180const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
181Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
182If NO meaningful relationship exists, set relation to \"none\".\n\
183Respond with the relation (or \"none\"), strength, and reasoning.";
184
185const ENTITY_CONNECT_SCHEMA: &str = r#"{
186  "type": "object",
187  "properties": {
188    "relation": { "type": "string" },
189    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
190    "reasoning": { "type": "string" }
191  },
192  "required": ["relation", "strength", "reasoning"],
193  "additionalProperties": false
194}"#;
195
196// G27 P2: entity-type-validate — verify entity type assignments
197const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
198Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
199If the current type is correct, keep it. If wrong, suggest the correct type.\n\
200Respond with the validated type and reasoning.";
201
202const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
203  "type": "object",
204  "properties": {
205    "validated_type": { "type": "string" },
206    "was_correct": { "type": "boolean" },
207    "reasoning": { "type": "string" }
208  },
209  "required": ["validated_type", "was_correct", "reasoning"],
210  "additionalProperties": false
211}"#;
212
213// G27 P2: description-enrich — improve generic memory descriptions
214const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
215BAD: 'ingested from docs/auth.md'\n\
216GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
217Respond with the improved description and reasoning.";
218
219const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
220  "type": "object",
221  "properties": {
222    "description": { "type": "string" },
223    "reasoning": { "type": "string" }
224  },
225  "required": ["description", "reasoning"],
226  "additionalProperties": false
227}"#;
228
229// G27 P2: domain-classify — classify memory into domain category
230const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
231Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
232
233const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
234  "type": "object",
235  "properties": {
236    "domain": { "type": "string" },
237    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
238    "reasoning": { "type": "string" }
239  },
240  "required": ["domain", "confidence", "reasoning"],
241  "additionalProperties": false
242}"#;
243
244// G27 P2: graph-audit — audit graph for quality issues
245const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
246Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
247Respond with a list of issues found (or empty if none) and an overall quality score.";
248
249const GRAPH_AUDIT_SCHEMA: &str = r#"{
250  "type": "object",
251  "properties": {
252    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
253    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
254    "reasoning": { "type": "string" }
255  },
256  "required": ["quality_score", "issues", "reasoning"],
257  "additionalProperties": false
258}"#;
259
260// G27 P2: deep-research-synth — synthesize research findings into graph
261const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
262Entity names: lowercase kebab-case, domain-specific.\n\
263Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
264Respond with extracted entities, relationships, and a synthesis summary.";
265
266const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
267  "type": "object",
268  "properties": {
269    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
270    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
271    "summary": { "type": "string" }
272  },
273  "required": ["entities", "relationships", "summary"],
274  "additionalProperties": false
275}"#;
276
277// G27 P2: body-extract — extract structured content from unstructured text
278const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
279Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
280Respond with the restructured body and a brief summary of changes.";
281
282const BODY_EXTRACT_SCHEMA: &str = r#"{
283  "type": "object",
284  "properties": {
285    "restructured_body": { "type": "string" },
286    "changes_summary": { "type": "string" }
287  },
288  "required": ["restructured_body", "changes_summary"],
289  "additionalProperties": false
290}"#;
291
292// ---------------------------------------------------------------------------
293// Prompts
294// ---------------------------------------------------------------------------
295
296const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2971. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2982. Typed relationships between entities with strength scores\n\n\
299Rules:\n\
300- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
301- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
302- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
303- NEVER use 'mentions' as relationship type\n\
304- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
305- Prefer fewer high-quality entities over many low-quality ones";
306
307const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
308
309const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
310
311// ---------------------------------------------------------------------------
312// CLI args
313// ---------------------------------------------------------------------------
314
315/// Operation to perform in the `enrich` command.
316#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
317#[serde(rename_all = "kebab-case")]
318pub enum EnrichOperation {
319    /// Add missing entity/relationship bindings to memories (fully implemented).
320    /// memory-bindings LINKS each memory to the EXISTING entities extracted from
321    /// its body — it does not invent a new graph, it only connects what is missing. Scans
322    /// only UNBOUND memories (those with zero `memory_entities`).
323    MemoryBindings,
324    /// GAP-SG-24/26: additive augmentation — re-run binding extraction over
325    /// memories that are ALREADY bound, filtered by `--names`/`--names-file`, to
326    /// merge newly-discovered entities/relationships WITHOUT removing existing
327    /// links. Requires a name filter (refuses to re-scan the whole namespace).
328    AugmentBindings,
329    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
330    EntityDescriptions,
331    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
332    BodyEnrich,
333    /// Rebuild missing memory embeddings without rewriting the memory body.
334    ReEmbed,
335    /// Calibrate relationship weights using LLM analysis (scan only).
336    WeightCalibrate,
337    /// Reclassify relationship types using LLM judgment (scan only).
338    RelationReclassify,
339    /// Connect isolated entities by suggesting new relationships (scan only).
340    EntityConnect,
341    /// Validate entity type assignments using LLM judgment (scan only).
342    EntityTypeValidate,
343    /// Enrich memory descriptions that are generic/auto-generated (scan only).
344    DescriptionEnrich,
345    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
346    CrossDomainBridges,
347    /// Classify memories into domain categories (scan only).
348    DomainClassify,
349    /// Audit the graph for quality issues (scan only).
350    GraphAudit,
351    /// Synthesize deep-research findings into graph memories (scan only).
352    DeepResearchSynth,
353    /// Extract structured body from unstructured text (scan only).
354    BodyExtract,
355}
356
357/// LLM provider for enrichment.
358#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
359pub enum EnrichMode {
360    /// Use locally installed Claude Code CLI (OAuth-first).
361    ClaudeCode,
362    /// Use locally installed OpenAI Codex CLI.
363    Codex,
364    /// Use locally installed OpenCode CLI.
365    #[value(name = "opencode")]
366    Opencode,
367    /// Use the OpenRouter chat-completions REST API (no local CLI; v1.0.95).
368    #[value(name = "openrouter")]
369    OpenRouter,
370}
371
372impl std::fmt::Display for EnrichMode {
373    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374        match self {
375            EnrichMode::ClaudeCode => write!(f, "claude-code"),
376            EnrichMode::Codex => write!(f, "codex"),
377            EnrichMode::Opencode => write!(f, "opencode"),
378            EnrichMode::OpenRouter => write!(f, "openrouter"),
379        }
380    }
381}
382
383/// Arguments for the `enrich` subcommand.
384#[derive(clap::Args)]
385#[command(
386    about = "Enrich graph memories and entities using an LLM provider",
387    after_long_help = "EXAMPLES:\n  \
388    # Add missing entity bindings to all unbound memories\n  \
389    sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n  \
390    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
391    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
392    # Expand short memory bodies (GAP-18)\n  \
393    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
394    # Rebuild only missing memory embeddings without rewriting bodies\n  \
395    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
396    # Resume an interrupted body-enrich run\n  \
397    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
398    # Retry only failed items from a previous run\n  \
399    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n  \
400    # Converge the whole backlog (internal scan+drain loop, no bash wrapper)\n  \
401    sqlite-graphrag enrich --operation memory-bindings --mode openrouter \\\n    \
402      --openrouter-model deepseek/deepseek-v4-flash:nitro --until-empty --max-runtime 600\n\n  \
403    # Inspect / resurrect dead-letter items\n  \
404    sqlite-graphrag enrich --operation memory-bindings --list-dead\n  \
405    sqlite-graphrag enrich --operation memory-bindings --requeue-dead\n\n  \
406    # Read-only status (no LLM, no singleton)\n  \
407    sqlite-graphrag enrich --operation memory-bindings --status\n\n\
408    OPERATIONS NOTE:\n  \
409    memory-bindings LINKS each memory to the EXISTING entities extracted from its\n  \
410    body — it does not invent a new graph, it connects what is missing. It scans\n  \
411    only UNBOUND memories. To re-run extraction over ALREADY-bound memories and\n  \
412    MERGE newly-found entities/relationships additively (without removing links),\n  \
413    use --operation augment-bindings with --names/--names-file.\n\n\
414    DEAD-LETTER SIDECAR (.enrich-queue.sqlite):\n  \
415    A SQLite sidecar tracks each work item across runs. Schema (table `queue`):\n  \
416    item_key (UNIQUE name/id), item_type (memory|entity), operation, memory_id,\n  \
417    status (pending|processing|done|skipped|dead), attempt, error, error_class,\n  \
418    next_retry_at (backoff cooldown). --until-empty loops scan→drain internally\n  \
419    until eligible items are exhausted; transient failures (incl. malformed/non-\n  \
420    JSON LLM output, GAP-SG-09) reschedule with backoff until --max-attempts, then\n  \
421    land in status='dead'. Use --status to see the queue, --list-dead to inspect\n  \
422    the sink, --requeue-dead to retry it, and --ignore-backoff to skip cooldowns.\n  \
423    --names/--names-file also remedy a cooldown by targeting a specific subset.\n\n\
424    EXIT CODES:\n  \
425    0  success\n  \
426    1  validation error (bad args, binary not found)\n  \
427    14 I/O error"
428)]
429pub struct EnrichArgs {
430    /// Enrichment operation to run. Required for write operations; optional for
431    /// the read-only queue inspectors (`--status` / `--list-dead` /
432    /// `--requeue-dead`), where it defaults to `memory-bindings` when omitted
433    /// (GAP-SG-31).
434    #[arg(
435        long,
436        short = 'o',
437        value_enum,
438        value_name = "OPERATION",
439        required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
440    )]
441    pub operation: Option<EnrichOperation>,
442
443    /// LLM provider to use. Required for write operations; not needed for the
444    /// read-only queue inspectors (`--status` / `--list-dead` /
445    /// `--requeue-dead`), which never call the LLM (GAP-SG-31).
446    #[arg(
447        long,
448        value_enum,
449        required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
450    )]
451    pub mode: Option<EnrichMode>,
452
453    /// Maximum number of items to process in this run. Omit for all.
454    #[arg(long, value_name = "N")]
455    pub limit: Option<usize>,
456
457    /// Preview items without calling the LLM (zero tokens consumed).
458    #[arg(long)]
459    pub dry_run: bool,
460
461    /// Namespace to operate on. Default: global.
462    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
463    pub namespace: Option<String>,
464
465    // -- Provider flags (Claude) --
466    /// Path to the Claude Code binary. Default: auto-detect from PATH.
467    #[arg(long, value_name = "PATH")]
468    pub claude_binary: Option<PathBuf>,
469
470    /// Claude model to use (e.g. claude-sonnet-4-6).
471    #[arg(long, value_name = "MODEL")]
472    pub claude_model: Option<String>,
473
474    /// Timeout per item in seconds when using Claude Code. Default: 300.
475    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
476    pub claude_timeout: u64,
477
478    // -- Provider flags (Codex) --
479    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
480    #[arg(long, value_name = "PATH")]
481    pub codex_binary: Option<PathBuf>,
482
483    /// Codex model to use (e.g. o4-mini).
484    #[arg(long, value_name = "MODEL")]
485    pub codex_model: Option<String>,
486
487    /// Timeout per item in seconds when using Codex. Default: 300.
488    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
489    pub codex_timeout: u64,
490
491    // -- Provider flags (OpenCode) --
492    /// Path to the OpenCode binary. Default: auto-detect from PATH.
493    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
494    pub opencode_binary: Option<PathBuf>,
495
496    /// OpenCode model to use.
497    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
498    pub opencode_model: Option<String>,
499
500    /// Timeout per item in seconds when using OpenCode. Default: 300.
501    #[arg(
502        long,
503        value_name = "SECONDS",
504        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
505        default_value_t = 300
506    )]
507    pub opencode_timeout: u64,
508
509    // -- Provider flags (OpenRouter, v1.0.95) --
510    /// OpenRouter text model to use (REQUIRED with --mode openrouter; no default).
511    #[arg(long, value_name = "MODEL")]
512    pub openrouter_model: Option<String>,
513
514    /// OpenRouter API key. Falls back to OPENROUTER_API_KEY env or stored config.
515    #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
516    pub openrouter_api_key: Option<String>,
517
518    /// Timeout per item in seconds when using OpenRouter. Default: 600.
519    ///
520    /// GAP-SG-17: raised from 300 to 600 because dense bodies (close to the
521    /// ~32K-token context ceiling of the configured model) routinely take
522    /// longer than five minutes to generate via `deepseek-v4-flash:nitro`.
523    /// Raise it further for very large corpora; lower it for short snippets.
524    #[arg(long, value_name = "SECONDS", default_value_t = 600)]
525    pub openrouter_timeout: u64,
526
527    /// Optional OpenRouter base URL override (reserved; defaults to the public API).
528    #[arg(long, value_name = "URL")]
529    pub openrouter_base_url: Option<String>,
530
531    // -- Cost controls --
532    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
533    #[arg(long, value_name = "USD")]
534    pub max_cost_usd: Option<f64>,
535
536    // -- Queue controls --
537    /// Resume a previously interrupted run (skip already-done items).
538    #[arg(long)]
539    pub resume: bool,
540
541    /// Retry only items that failed in a previous run.
542    #[arg(long)]
543    pub retry_failed: bool,
544
545    /// GAP-ENRICH-BACKLOG-CONVERGE: loop scan→drain internally until the queue
546    /// empties of eligible items or --max-runtime elapses; removes the need for
547    /// an external bash retry loop.
548    #[arg(long)]
549    pub until_empty: bool,
550
551    /// GAP-ENRICH-BACKLOG-CONVERGE: wall-clock ceiling in seconds for
552    /// --until-empty. Defaults to 3600 when omitted.
553    #[arg(long, value_name = "SECONDS")]
554    pub max_runtime: Option<u64>,
555
556    /// GAP-ENRICH-BACKLOG-CONVERGE: attempts per item before it becomes a
557    /// dead-letter (status='dead'). Range 1..=20. Default 8.
558    ///
559    /// GAP-SG-21: the default was raised from 5 to 8 because GAP-SG-09 now
560    /// reclassifies malformed / non-JSON LLM output as TRANSIENT (retryable)
561    /// rather than a permanent HardFailure. A flaky structured-output model
562    /// (e.g. deepseek-v4-flash:nitro) may emit several bad generations in a row
563    /// even after JSON repair (GAP-SG-10) recovers most of them; the extra
564    /// attempts give the backlog room to converge before an item is parked in
565    /// the dead-letter sink. Permanent faults (ProviderError, NotFound) still
566    /// dead-letter on the first attempt regardless of this value.
567    #[arg(long, value_name = "N", default_value_t = 8, value_parser = clap::value_parser!(u32).range(1..=20))]
568    pub max_attempts: u32,
569
570    /// GAP-ENRICH-BACKLOG-CONVERGE: read-only mode — report queue and backlog
571    /// counts without calling the LLM or acquiring the singleton.
572    #[arg(long)]
573    pub status: bool,
574
575    /// GAP-SG-23: list every dead-letter item (status='dead') for the current
576    /// operation with its error_class, attempt count and last error message.
577    /// Read-only — no LLM, no singleton. Use it to inspect what `--requeue-dead`
578    /// would resurrect before running it.
579    #[arg(long)]
580    pub list_dead: bool,
581
582    /// GAP-SG-11/14: resurrect dead-letter items — move every `status='dead'`
583    /// row back to `pending`, zeroing `attempt`, `next_retry_at`, `error` and
584    /// `error_class`. Distinct from `--retry-failed`, which only resets the
585    /// legacy `status='failed'` rows; dead-letter rows are the terminal sink of
586    /// the v1.0.96 converge loop and are never re-selected without this flag.
587    /// No LLM call or singleton is taken — it only rewrites queue statuses.
588    #[arg(long)]
589    pub requeue_dead: bool,
590
591    /// GAP-SG-66: prune ORPHAN dead-letter rows — remove every `status='dead'`
592    /// memory row whose `item_key` (the memory name) no longer exists in the
593    /// main DB for this namespace. These are terminal "not found" failures that
594    /// `--requeue-dead` can never recover (re-processing re-fails the same way),
595    /// so they inflate `queue_dead` forever. Read-only on the main DB; deletes
596    /// only confirmed-orphan rows from the queue sidecar. Entity-keyed dead rows
597    /// are left untouched. No LLM, no singleton — like `--list-dead`.
598    #[arg(long)]
599    pub prune_dead_orphans: bool,
600
601    /// GAP-SG-16: ignore the per-item backoff cooldown (`next_retry_at`) when
602    /// selecting candidates, so items waiting on exponential backoff are
603    /// processed immediately. Use to drain a backlog whose cooldown windows are
604    /// long but the provider has recovered. Without it, `--status` reports such
605    /// items under `waiting` and they are skipped until their `next_retry_at`.
606    #[arg(long)]
607    pub ignore_backoff: bool,
608
609    /// GAP-SG-28: read-only `body-extract` — extract entities/relationships into
610    /// the graph WITHOUT rewriting (or truncating) the memory body. The default
611    /// `body-extract` restructures the stored body in place; with this flag the
612    /// body is left untouched and only graph bindings are persisted (additive,
613    /// via the same upsert path as `memory-bindings`). Ignored for every other
614    /// operation.
615    #[arg(long)]
616    pub body_extract_graph_only: bool,
617
618    /// GAP-ENRICH-BACKLOG-CONVERGE: REST concurrency for --mode openrouter
619    /// (clamp 1..=16, default 8). Distinct from the legacy --llm-parallelism.
620    #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
621    pub rest_concurrency: Option<u32>,
622
623    // -- body-enrich specific flags (GAP-18) --
624    /// Minimum output character count for body-enrich. Default: 500.
625    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
626    pub min_output_chars: usize,
627
628    /// Maximum output character count for body-enrich. Default: 2000.
629    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
630    pub max_output_chars: usize,
631
632    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
633    #[arg(long, default_value_t = true)]
634    pub preserve_check: bool,
635
636    /// Path to a custom prompt template file for body-enrich.
637    #[arg(long, value_name = "PATH")]
638    pub prompt_template: Option<PathBuf>,
639
640    /// Number of parallel LLM workers (default 1 = serial).
641    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
642    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
643    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
644    pub llm_parallelism: u32,
645
646    // -- Output / infra --
647    /// Emit NDJSON output. Always true; flag accepted for compatibility.
648    #[arg(long)]
649    pub json: bool,
650
651    /// Database path override.
652    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
653    pub db: Option<String>,
654
655    /// G30: poll for the job singleton every second for up to N seconds
656    /// when another invocation holds the lock. Default: 0 (fail fast).
657    #[arg(long, value_name = "SECONDS")]
658    pub wait_job_singleton: Option<u64>,
659
660    /// G30: force acquisition of the singleton lock by removing a stale
661    /// lock file from a previously crashed invocation. Use only when you
662    /// are certain no other `enrich`/`ingest` is running.
663    #[arg(long, default_value_t = false)]
664    pub force_job_singleton: bool,
665
666    /// G37: select a specific subset of memory names to enrich instead of
667    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
668    /// Empty when omitted (processes all candidates).
669    ///
670    /// GAP-SG-18: also a cooldown remedy — when `--status` shows items under
671    /// `waiting` (parked on `next_retry_at` backoff), pass the exact names here
672    /// to re-enqueue and process just that subset on the next run instead of
673    /// waiting for every cooldown to elapse. REQUIRED for `--operation
674    /// augment-bindings`, which refuses to re-scan the whole namespace.
675    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
676    pub names: Vec<String>,
677
678    /// G37: read the subset of memory names from a file (one per line).
679    /// Lines starting with `#` and empty lines are ignored. Combined with
680    /// `--names` (union) when both are set.
681    #[arg(long, value_name = "PATH")]
682    pub names_file: Option<PathBuf>,
683
684    /// G35: probe the LLM provider with a 1-turn ping before processing
685    /// the batch. Aborts with a clear error if the rate-limit window is
686    /// closed (avoids burning N turns only to fail on item 1).
687    #[arg(long, default_value_t = false)]
688    pub preflight_check: bool,
689
690    /// G35: if a preflight probe or in-flight call hits the Claude rate
691    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
692    /// of failing the batch. Ignored when `--mode` is already `codex`.
693    #[arg(long, value_enum)]
694    pub fallback_mode: Option<EnrichMode>,
695
696    /// G35: number of seconds before the OAuth rate-limit reset at which
697    /// the preflight probe should refuse to start. Default 300 (5 min).
698    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
699    pub rate_limit_buffer: u64,
700
701    /// G28-D: refuse to start when the 1-minute load average exceeds
702    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
703    /// Set to false to skip the check on contended CI runners.
704    #[arg(long, default_value_t = true)]
705    pub max_load_check: bool,
706
707    /// G28-D: when the system is saturated, abort the job after this
708    /// many consecutive HardFailure outcomes. Default 5.
709    #[arg(long, value_name = "N", default_value_t = 5)]
710    pub circuit_breaker_threshold: u32,
711
712    /// G29 Step 4: minimum trigram-Jaccard similarity between the
713    /// original body and the LLM-rewritten body for the rewrite to be
714    /// accepted. Scores below the threshold are rejected and emitted as
715    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
716    /// gap specification). Ignored when `--operation` is not
717    /// `body-enrich`.
718    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
719    pub preserve_threshold: f64,
720
721    /// G33 Step 3: when set, validate `--codex-model` against the
722    /// ChatGPT Pro OAuth accepted-model list and abort with a
723    /// suggestion when the value is unknown. Default true (fail fast
724    /// to avoid burning OAuth turns). Set to false to opt out.
725    #[arg(long, default_value_t = true)]
726    pub codex_model_validate: bool,
727
728    /// G33 Step 3: when set together with an invalid `--codex-model`,
729    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
730    /// instead of aborting. The substitution is recorded in the NDJSON
731    /// stream as `provider_substituted: true` for traceability.
732    #[arg(long, value_name = "MODEL")]
733    pub codex_model_fallback: Option<String>,
734}
735
736impl EnrichArgs {
737    /// GAP-SG-31: resolved enrichment operation.
738    ///
739    /// `operation` is `Option` so the read-only queue inspectors
740    /// (`--status` / `--list-dead` / `--requeue-dead`) can run without it.
741    /// Write paths always carry a value (enforced by
742    /// `required_unless_present_any` at parse time); the read-only paths fall
743    /// back to `memory-bindings`, the most common queue, when it is omitted.
744    fn operation(&self) -> EnrichOperation {
745        self.operation
746            .clone()
747            .unwrap_or(EnrichOperation::MemoryBindings)
748    }
749
750    /// GAP-SG-31: resolved LLM provider. `mode` is `Option` for the read-only
751    /// inspectors that never call the LLM; write paths always carry a value
752    /// (enforced by `required_unless_present_any`). The fallback is only ever
753    /// observed by read-only code that does not actually invoke the provider.
754    fn mode(&self) -> EnrichMode {
755        self.mode.clone().unwrap_or(EnrichMode::OpenRouter)
756    }
757}
758
759// ---------------------------------------------------------------------------
760// Internal types — raw LLM output structs
761// ---------------------------------------------------------------------------
762
763// ---------------------------------------------------------------------------
764// NDJSON event types emitted to stdout
765// ---------------------------------------------------------------------------
766
767#[derive(Debug, Serialize)]
768struct PhaseEvent<'a> {
769    phase: &'a str,
770    #[serde(skip_serializing_if = "Option::is_none")]
771    binary_path: Option<&'a str>,
772    #[serde(skip_serializing_if = "Option::is_none")]
773    version: Option<&'a str>,
774    #[serde(skip_serializing_if = "Option::is_none")]
775    items_total: Option<usize>,
776    #[serde(skip_serializing_if = "Option::is_none")]
777    items_pending: Option<usize>,
778    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
779    #[serde(skip_serializing_if = "Option::is_none")]
780    llm_parallelism: Option<u32>,
781}
782
783/// GAP-SG-45: separates the SCAN metric (always serial — a single SQL sweep of
784/// the candidate set) from the DRAIN metric (the parallel worker fan-out). The
785/// legacy "scan" `PhaseEvent` reported `llm_parallelism` on the scan event,
786/// conflating the two; this event makes the distinction explicit.
787#[derive(Debug, Serialize)]
788struct ConcurrencyEvent {
789    phase: &'static str,
790    scan_parallelism: u32,
791    drain_parallelism: u32,
792}
793
794#[derive(Debug, Serialize)]
795struct ItemEvent<'a> {
796    /// Item identifier (memory name or entity name).
797    item: &'a str,
798    status: &'a str,
799    #[serde(skip_serializing_if = "Option::is_none")]
800    memory_id: Option<i64>,
801    #[serde(skip_serializing_if = "Option::is_none")]
802    entity_id: Option<i64>,
803    #[serde(skip_serializing_if = "Option::is_none")]
804    entities: Option<usize>,
805    #[serde(skip_serializing_if = "Option::is_none")]
806    rels: Option<usize>,
807    #[serde(skip_serializing_if = "Option::is_none")]
808    chars_before: Option<usize>,
809    #[serde(skip_serializing_if = "Option::is_none")]
810    chars_after: Option<usize>,
811    #[serde(skip_serializing_if = "Option::is_none")]
812    cost_usd: Option<f64>,
813    #[serde(skip_serializing_if = "Option::is_none")]
814    elapsed_ms: Option<u64>,
815    #[serde(skip_serializing_if = "Option::is_none")]
816    error: Option<String>,
817    index: usize,
818    total: usize,
819}
820
821#[derive(Debug, Serialize)]
822struct EnrichSummary {
823    summary: bool,
824    operation: String,
825    items_total: usize,
826    completed: usize,
827    failed: usize,
828    skipped: usize,
829    cost_usd: f64,
830    elapsed_ms: u64,
831    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that actually
832    /// ran the re-embedding during enrich. `"claude" | "codex" | "none"`.
833    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
834    /// or when the operation did not involve a re-embed).
835    #[serde(skip_serializing_if = "Option::is_none")]
836    backend_invoked: Option<&'static str>,
837    /// GAP-SG-15: items still parked on backoff (`status='pending'` with a future
838    /// `next_retry_at`) when the run ended. Non-zero means the backlog has NOT
839    /// converged — those items are waiting on a cooldown, not done.
840    waiting: i64,
841    /// GAP-SG-15: dead-letter items (`status='dead'`) at the end of the run.
842    /// Non-zero requires `--list-dead` to inspect and `--requeue-dead` to retry.
843    dead: i64,
844}
845
846use crate::output::emit_json_line as emit_json;
847
848// ---------------------------------------------------------------------------
849// Queue DB
850// ---------------------------------------------------------------------------
851
852// Queue functions and structs moved to queue.rs
853
854// LLM call_claude and call_openrouter moved to extraction.rs
855
856// ---------------------------------------------------------------------------
857// Preflight probe (G35) — single-turn ping to verify the LLM provider
858// ---------------------------------------------------------------------------
859
860/// Result of a single preflight ping (G35).
861enum PreflightOutcome {
862    /// The provider accepted the ping without rate-limit or other errors.
863    Healthy,
864    /// The provider rejected the ping due to OAuth rate limit. The
865    /// `suggestion` field is a human hint that callers can embed in the
866    /// user-facing error.
867    RateLimited {
868        reason: String,
869        suggestion: &'static str,
870    },
871    /// Any other provider error (binary missing, auth failure, etc.).
872    Error(AppError),
873}
874
875/// Probes the configured LLM provider with a 1-turn ping.
876///
877/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
878/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
879///
880/// The probe intentionally avoids spawning any MCP server children (G28-A)
881/// to keep its own process footprint at the minimum.
882fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
883    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
884
885    match args.mode() {
886        EnrichMode::ClaudeCode => {
887            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
888                Ok(b) => b,
889                Err(e) => return PreflightOutcome::Error(e),
890            };
891            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
892            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
893            // form) and run the preflight gate before spawn, mirroring
894            // the canonical pattern in `claude_runner::build_claude_command`.
895            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
896                Ok(p) => p,
897                Err(e) => {
898                    return PreflightOutcome::Error(AppError::Io(e));
899                }
900            };
901            let mut cmd = std::process::Command::new(&bin);
902            crate::spawn::env_whitelist::apply_env_whitelist(
903                &mut cmd,
904                crate::spawn::env_whitelist::is_strict_env_clear(),
905            );
906            if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
907                return PreflightOutcome::Error(e);
908            }
909            cmd.arg("-p")
910                .arg("ping")
911                .arg("--max-turns")
912                .arg("1")
913                .arg("--strict-mcp-config")
914                .arg("--mcp-config")
915                .arg(mcp_config_path.as_os_str())
916                .arg("--dangerously-skip-permissions")
917                .arg("--settings")
918                .arg("{\"hooks\":{}}")
919                .arg("--output-format")
920                .arg("json")
921                .stdin(std::process::Stdio::null())
922                .stdout(std::process::Stdio::piped())
923                .stderr(std::process::Stdio::piped());
924
925            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
926                Ok(c) => c,
927                Err(e) => {
928                    return PreflightOutcome::Error(AppError::Io(e));
929                }
930            };
931            let output = match wait_with_timeout(child, timeout) {
932                Ok(out) => out,
933                Err(e) => return PreflightOutcome::Error(e),
934            };
935            if !output.status.success() {
936                let stderr = String::from_utf8_lossy(&output.stderr);
937                if stderr.contains("hit your session limit")
938                    || stderr.contains("rate_limit")
939                    || stderr.contains("429")
940                {
941                    return PreflightOutcome::RateLimited {
942                        reason: stderr.trim().to_string(),
943                        suggestion:
944                            "wait for the OAuth window to reset or use --fallback-mode codex",
945                    };
946                }
947                return PreflightOutcome::Error(AppError::Validation(format!(
948                    "preflight probe failed: {stderr}",
949                    stderr = stderr.trim()
950                )));
951            }
952            PreflightOutcome::Healthy
953        }
954        EnrichMode::Codex => {
955            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
956                Ok(b) => b,
957                Err(e) => return PreflightOutcome::Error(e),
958            };
959            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
960                .map_err(PreflightOutcome::Error)
961                .ok();
962            let schema = "{}";
963            let schema_path = match super::codex_spawn::trusted_schema_path() {
964                Ok(p) => p,
965                Err(e) => return PreflightOutcome::Error(e),
966            };
967            let spawn_args = super::codex_spawn::CodexSpawnArgs {
968                binary: &bin,
969                prompt: "ping",
970                json_schema: schema,
971                input_text: "",
972                model: args.codex_model.as_deref(),
973                timeout_secs: args.rate_limit_buffer.max(60),
974                schema_path: schema_path.clone(),
975            };
976            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
977                Ok(c) => c,
978                Err(e) => return PreflightOutcome::Error(e),
979            };
980            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
981                Ok(c) => c,
982                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
983            };
984            let output = match wait_with_timeout(child, timeout) {
985                Ok(out) => out,
986                Err(e) => return PreflightOutcome::Error(e),
987            };
988            let _ = std::fs::remove_file(&schema_path);
989            if !output.status.success() {
990                let stderr = String::from_utf8_lossy(&output.stderr);
991                if stderr.contains("rate_limit")
992                    || stderr.contains("429")
993                    || stderr.contains("Too Many Requests")
994                {
995                    return PreflightOutcome::RateLimited {
996                        reason: stderr.trim().to_string(),
997                        suggestion: "wait for the rate-limit window to reset",
998                    };
999                }
1000                return PreflightOutcome::Error(AppError::Validation(format!(
1001                    "preflight probe failed: {stderr}",
1002                    stderr = stderr.trim()
1003                )));
1004            }
1005            PreflightOutcome::Healthy
1006        }
1007        EnrichMode::Opencode => {
1008            let bin = match super::opencode_runner::find_opencode_binary_with_override(
1009                args.opencode_binary.as_deref(),
1010            ) {
1011                Ok(b) => b,
1012                Err(e) => return PreflightOutcome::Error(e),
1013            };
1014            let model =
1015                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1016            let mut cmd =
1017                match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1018                {
1019                    Ok(c) => c,
1020                    Err(e) => return PreflightOutcome::Error(e),
1021                };
1022            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1023                Ok(c) => c,
1024                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1025            };
1026            let output = match wait_with_timeout(child, timeout) {
1027                Ok(out) => out,
1028                Err(e) => return PreflightOutcome::Error(e),
1029            };
1030            if !output.status.success() {
1031                let stderr = String::from_utf8_lossy(&output.stderr);
1032                if stderr.contains("rate_limit")
1033                    || stderr.contains("429")
1034                    || stderr.contains("Too Many Requests")
1035                {
1036                    return PreflightOutcome::RateLimited {
1037                        reason: stderr.trim().to_string(),
1038                        suggestion: "wait for the rate-limit window to reset",
1039                    };
1040                }
1041                return PreflightOutcome::Error(AppError::Validation(format!(
1042                    "preflight probe failed: {stderr}",
1043                    stderr = stderr.trim()
1044                )));
1045            }
1046            PreflightOutcome::Healthy
1047        }
1048        EnrichMode::OpenRouter => {
1049            // v1.0.95: the OpenRouter JUDGE has no subprocess to ping; the
1050            // preflight only confirms a usable API key resolves. The chat
1051            // client singleton is initialised in run() before scan.
1052            match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1053                Some(_) => PreflightOutcome::Healthy,
1054                None => PreflightOutcome::Error(AppError::Validation(
1055                    "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1056                )),
1057            }
1058        }
1059    }
1060}
1061
1062/// Cross-platform wait with timeout (no extra crate dependency).
1063fn wait_with_timeout(
1064    mut child: std::process::Child,
1065    timeout: std::time::Duration,
1066) -> Result<std::process::Output, AppError> {
1067    use wait_timeout::ChildExt;
1068    let start = std::time::Instant::now();
1069    let Some(exit) = child.wait_timeout(timeout).map_err(AppError::Io)? else {
1070        let _ = child.kill();
1071        let _ = child.wait();
1072        return Err(AppError::Validation(format!(
1073            "preflight probe timed out after {}s",
1074            start.elapsed().as_secs()
1075        )));
1076    };
1077    let mut stdout = Vec::new();
1078    if let Some(mut out) = child.stdout.take() {
1079        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1080    }
1081    let mut stderr = Vec::new();
1082    if let Some(mut err) = child.stderr.take() {
1083        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1084    }
1085    Ok(std::process::Output {
1086        status: exit,
1087        stdout,
1088        stderr,
1089    })
1090}
1091
1092// Scan functions moved to scan.rs
1093
1094// Persist functions moved to postprocess.rs
1095
1096// ---------------------------------------------------------------------------
1097// Main entry point
1098// ---------------------------------------------------------------------------
1099
1100// ---------------------------------------------------------------------------
1101// G20: mode-conditional flag validation
1102// ---------------------------------------------------------------------------
1103
1104/// True when a scalar value matches its declared default. Used to
1105/// distinguish "operator passed an explicit override" from "clap filled
1106/// the default" for flags with default_value_t.
1107fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1108    value == default
1109}
1110
1111/// G20: validate that flags for one LLM provider were not passed when
1112/// the operator selected a different provider. Flags silently discarded
1113/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1114/// DB work, so the operator gets an actionable error instead of a
1115/// surprise at runtime.
1116///
1117/// Detection rules:
1118/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1119/// - For scalar fields with default_value_t: value != default means explicit
1120/// - For boolean fields: true means explicit (default is false)
1121///
1122/// Mode-specific matrices:
1123/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1124/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1125fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1126    const DEFAULT_TIMEOUT: u64 = 300;
1127
1128    let mut conflicts: Vec<String> = Vec::new();
1129
1130    match args.mode() {
1131        EnrichMode::ClaudeCode => {
1132            if args.codex_binary.is_some() {
1133                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1134            }
1135            if args.codex_model.is_some() {
1136                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1137            }
1138            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1139                conflicts.push(format!(
1140                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1141                    args.codex_timeout
1142                ));
1143            }
1144        }
1145        EnrichMode::Codex => {
1146            if args.claude_binary.is_some() {
1147                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1148            }
1149            if args.claude_model.is_some() {
1150                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1151            }
1152            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1153                conflicts.push(format!(
1154                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1155                    args.claude_timeout
1156                ));
1157            }
1158            if args.max_cost_usd.is_some() {
1159                conflicts.push(
1160                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1161                        .to_string(),
1162                );
1163            }
1164        }
1165        EnrichMode::Opencode => {
1166            if args.claude_binary.is_some() {
1167                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1168            }
1169            if args.claude_model.is_some() {
1170                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1171            }
1172            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1173                conflicts.push(format!(
1174                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1175                    args.claude_timeout
1176                ));
1177            }
1178            if args.max_cost_usd.is_some() {
1179                conflicts.push(
1180                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1181                        .to_string(),
1182                );
1183            }
1184        }
1185        EnrichMode::OpenRouter => {
1186            if args.claude_binary.is_some() {
1187                conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1188            }
1189            if args.claude_model.is_some() {
1190                conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1191            }
1192            if args.codex_binary.is_some() {
1193                conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1194            }
1195            if args.codex_model.is_some() {
1196                conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1197            }
1198            if args.opencode_binary.is_some() {
1199                conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1200            }
1201            if args.opencode_model.is_some() {
1202                conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1203            }
1204            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1205                conflicts.push(format!(
1206                    "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1207                    args.claude_timeout
1208                ));
1209            }
1210            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1211                conflicts.push(format!(
1212                    "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1213                    args.codex_timeout
1214                ));
1215            }
1216            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1217                conflicts.push(format!(
1218                    "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1219                    args.opencode_timeout
1220                ));
1221            }
1222        }
1223    }
1224
1225    if !conflicts.is_empty() {
1226        return Err(AppError::Validation(format!(
1227            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1228            args.mode(),
1229            conflicts.join("\n  - ")
1230        )));
1231    }
1232
1233    Ok(())
1234}
1235
1236// ---------------------------------------------------------------------------
1237
1238/// Main entry point for the `enrich` command.
1239pub fn run(
1240    args: &EnrichArgs,
1241    llm_backend: crate::cli::LlmBackendChoice,
1242    embedding_backend: crate::cli::EmbeddingBackendChoice,
1243) -> Result<(), AppError> {
1244    // G20: mode-conditional flag validation BEFORE any DB access.
1245    // Surfaces flags that the wrong mode would silently discard.
1246    validate_mode_conditional_flags_enrich(args)?;
1247
1248    // GAP-ENRICH-BACKLOG-CONVERGE: --status is a read-only report. It never
1249    // calls the LLM, never initialises the OpenRouter client, and never
1250    // acquires the job singleton, so it is safe to run while a real enrich is
1251    // in flight (it only reads the queue DB and the unbound backlog).
1252    // GAP-SG-23/11: --list-dead (inspect dead-letter rows) and --requeue-dead
1253    // (resurrect them) are queue-only operations — no LLM, no main-DB write, no
1254    // singleton. Both are scoped to the current --operation so a shared queue is
1255    // not cross-contaminated. Handled before any provider setup.
1256    if args.list_dead || args.requeue_dead || args.prune_dead_orphans {
1257        let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1258        let op_label = format!("{:?}", args.operation());
1259        let paths = AppPaths::resolve(args.db.as_deref())?;
1260        let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1261        let queue_conn = open_queue_db(&queue_path)?;
1262        // GAP-SG-66: prune orphan dead rows (memory gone) — needs the main DB to
1263        // confirm the referenced memory is truly absent before deleting.
1264        if args.prune_dead_orphans {
1265            ensure_db_ready(&paths)?;
1266            let main_conn = open_rw(&paths.db)?;
1267            let pruned = prune_dead_orphans(&queue_conn, &main_conn, &op_label, &namespace)?;
1268            let dead_total: i64 = queue_conn
1269                .query_row(
1270                    "SELECT COUNT(*) FROM queue WHERE status='dead' \
1271                     AND (operation = ?1 OR operation IS NULL)",
1272                    rusqlite::params![op_label],
1273                    |r| r.get(0),
1274                )
1275                .unwrap_or(0);
1276            emit_json(&DeadSummary {
1277                summary: true,
1278                operation: op_label,
1279                namespace,
1280                action: "prune-dead-orphans",
1281                dead_total,
1282                requeued: 0,
1283                pruned,
1284            });
1285            return Ok(());
1286        }
1287        if args.list_dead {
1288            let mut stmt = queue_conn.prepare(
1289                "SELECT item_key, item_type, attempt, error_class, error, \
1290                         finish_reason, input_tokens, output_tokens FROM queue \
1291                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL) ORDER BY id",
1292            )?;
1293            let rows = stmt
1294                .query_map(rusqlite::params![op_label], |r| {
1295                    Ok(DeadItem {
1296                        dead_item: true,
1297                        item_key: r.get(0)?,
1298                        item_type: r.get(1)?,
1299                        attempt: r.get(2)?,
1300                        error_class: r.get(3)?,
1301                        error: r.get(4)?,
1302                        finish_reason: r.get(5)?,
1303                        input_tokens: r.get(6)?,
1304                        output_tokens: r.get(7)?,
1305                    })
1306                })?
1307                .collect::<Result<Vec<_>, _>>()?;
1308            let dead_total = rows.len() as i64;
1309            for item in &rows {
1310                emit_json(item);
1311            }
1312            emit_json(&DeadSummary {
1313                summary: true,
1314                operation: op_label,
1315                namespace,
1316                action: "list-dead",
1317                dead_total,
1318                requeued: 0,
1319                pruned: 0,
1320            });
1321            return Ok(());
1322        }
1323        // --requeue-dead: move dead -> pending, clearing the failure bookkeeping.
1324        let dead_total: i64 = queue_conn
1325            .query_row(
1326                "SELECT COUNT(*) FROM queue WHERE status='dead' \
1327                 AND (operation = ?1 OR operation IS NULL)",
1328                rusqlite::params![op_label],
1329                |r| r.get(0),
1330            )
1331            .unwrap_or(0);
1332        let requeued = queue_conn
1333            .execute(
1334                "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1335                 error=NULL, error_class=NULL \
1336                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1337                rusqlite::params![op_label],
1338            )
1339            .map_err(|e| AppError::Validation(format!("requeue-dead failed: {e}")))?
1340            as i64;
1341        let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1342        emit_json(&DeadSummary {
1343            summary: true,
1344            operation: op_label,
1345            namespace,
1346            action: "requeue-dead",
1347            dead_total,
1348            requeued,
1349            pruned: 0,
1350        });
1351        return Ok(());
1352    }
1353
1354    if args.status {
1355        let paths = AppPaths::resolve(args.db.as_deref())?;
1356        ensure_db_ready(&paths)?;
1357        let conn = open_rw(&paths.db)?;
1358        let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1359        let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1360        // GAP-SG-77: DB-semantics backlog for the queried operation (fixes the
1361        // false pending=0 for entity-descriptions/body-enrich/re-embed).
1362        let scan_backlog = count_operation_backlog(&conn, &args.operation(), &namespace)?;
1363        let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1364        let queue_conn = open_queue_db(&queue_path)?;
1365        let op_label = format!("{:?}", args.operation());
1366        // GAP-SG-42: scope every count to the current operation. Rows migrated
1367        // before the `operation` column (NULL) are still counted so a legacy
1368        // queue is never reported as spuriously empty.
1369        let count_status = |st: &str, op: &str| -> i64 {
1370            queue_conn
1371                .query_row(
1372                    "SELECT COUNT(*) FROM queue WHERE status=?1 \
1373                     AND (operation = ?2 OR operation IS NULL)",
1374                    rusqlite::params![st, op],
1375                    |r| r.get(0),
1376                )
1377                .unwrap_or(0)
1378        };
1379        let eligible_now: i64 = queue_conn
1380            .query_row(
1381                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1382                 AND (operation = ?1 OR operation IS NULL) \
1383                 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1384                rusqlite::params![op_label],
1385                |r| r.get(0),
1386            )
1387            .unwrap_or(0);
1388        let waiting: i64 = queue_conn
1389            .query_row(
1390                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1391                 AND (operation = ?1 OR operation IS NULL) \
1392                 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1393                rusqlite::params![op_label],
1394                |r| r.get(0),
1395            )
1396            .unwrap_or(0);
1397        // GAP-SG-16: enumerate the items currently in backoff with their ETA.
1398        let waiting_items = {
1399            let mut stmt = queue_conn.prepare(
1400                "SELECT item_key, attempt, next_retry_at, error_class FROM queue \
1401                 WHERE status='pending' AND (operation = ?1 OR operation IS NULL) \
1402                 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now') \
1403                 ORDER BY next_retry_at",
1404            )?;
1405            let items: Vec<WaitingItem> = stmt
1406                .query_map(rusqlite::params![op_label], |r| {
1407                    Ok(WaitingItem {
1408                        item_key: r.get(0)?,
1409                        attempt: r.get(1)?,
1410                        next_retry_at: r.get(2)?,
1411                        error_class: r.get(3)?,
1412                    })
1413                })?
1414                .collect::<Result<Vec<_>, _>>()?;
1415            items
1416        };
1417        let queue_pending = count_status("pending", &op_label);
1418        let queue_processing = count_status("processing", &op_label);
1419        let queue_done = count_status("done", &op_label);
1420        let queue_failed = count_status("failed", &op_label);
1421        let queue_skipped = count_status("skipped", &op_label);
1422        let queue_dead = count_status("dead", &op_label);
1423        // GAP-SG-15/46: distinguish empty from cooldown from not-yet-scanned.
1424        let state = if eligible_now > 0 {
1425            "draining"
1426        } else if waiting > 0 {
1427            "cooldown"
1428        } else if queue_pending == 0 && scan_backlog > 0 {
1429            "pending-scan"
1430        } else {
1431            "empty"
1432        };
1433        emit_json(&EnrichStatus {
1434            status_report: true,
1435            operation: op_label,
1436            namespace,
1437            unbound_backlog,
1438            scan_backlog,
1439            queue_pending,
1440            queue_processing,
1441            queue_done,
1442            queue_failed,
1443            queue_skipped,
1444            queue_dead,
1445            eligible_now,
1446            waiting,
1447            state,
1448            waiting_items,
1449        });
1450        return Ok(());
1451    }
1452
1453    // v1.0.95 (ADR-0054): when the JUDGE is OpenRouter the model is mandatory
1454    // (no default) and the API key must resolve BEFORE any network or DB work.
1455    // The chat client singleton is initialised here so every per-item dispatch
1456    // fetches it without re-threading the key.
1457    if args.mode() == EnrichMode::OpenRouter {
1458        let model = args.openrouter_model.as_deref().ok_or_else(|| {
1459            AppError::Validation(
1460                "--mode openrouter requires --openrouter-model (no default model is allowed)"
1461                    .into(),
1462            )
1463        })?;
1464        let resolved =
1465            crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1466                .ok_or_else(|| {
1467                    AppError::Validation(
1468                        "OPENROUTER_API_KEY not found; set the env var, store it via \
1469                         `config add-key --provider openrouter`, or pass --openrouter-api-key"
1470                            .into(),
1471                    )
1472                })?;
1473        crate::embedder::get_openrouter_chat_client(
1474            resolved.value,
1475            model,
1476            args.openrouter_timeout,
1477        )?;
1478    }
1479
1480    let started = Instant::now();
1481
1482    let paths = AppPaths::resolve(args.db.as_deref())?;
1483    ensure_db_ready(&paths)?;
1484    let conn = open_rw(&paths.db)?;
1485    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1486
1487    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1488    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1489    // on the same DB cannot co-exist, but concurrent enrich on different
1490    // databases works as expected. The force flag (--force) breaks a
1491    // stale lock from a previously crashed invocation.
1492    let wait_secs = args.wait_job_singleton;
1493    let force_flag = args.force_job_singleton;
1494    let _singleton = crate::lock::acquire_job_singleton(
1495        crate::lock::JobType::Enrich,
1496        &namespace,
1497        &paths.db,
1498        wait_secs,
1499        force_flag,
1500    )?;
1501
1502    // Validate provider binary upfront only for LLM-backed operations.
1503    let provider_binary = if matches!(args.operation(), EnrichOperation::ReEmbed) {
1504        None
1505    } else {
1506        Some(match args.mode() {
1507            EnrichMode::ClaudeCode => {
1508                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1509                let version = super::claude_runner::validate_claude_version(&bin)?;
1510                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1511                emit_json(&PhaseEvent {
1512                    phase: "validate",
1513                    binary_path: bin.to_str(),
1514                    version: Some(&version),
1515                    items_total: None,
1516                    items_pending: None,
1517                    llm_parallelism: None,
1518                });
1519                bin
1520            }
1521            EnrichMode::Codex => {
1522                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1523                emit_json(&PhaseEvent {
1524                    phase: "validate",
1525                    binary_path: bin.to_str(),
1526                    version: None,
1527                    items_total: None,
1528                    items_pending: None,
1529                    llm_parallelism: None,
1530                });
1531                bin
1532            }
1533            EnrichMode::Opencode => {
1534                let bin = super::opencode_runner::find_opencode_binary_with_override(
1535                    args.opencode_binary.as_deref(),
1536                )?;
1537                emit_json(&PhaseEvent {
1538                    phase: "validate",
1539                    binary_path: bin.to_str(),
1540                    version: None,
1541                    items_total: None,
1542                    items_pending: None,
1543                    llm_parallelism: None,
1544                });
1545                bin
1546            }
1547            EnrichMode::OpenRouter => {
1548                // v1.0.95: the OpenRouter JUDGE is a REST call, not a spawned
1549                // binary. The chat client singleton was initialised at the top
1550                // of run(); this placeholder path threads through the dispatch
1551                // but is never dereferenced by the OpenRouter arm.
1552                emit_json(&PhaseEvent {
1553                    phase: "validate",
1554                    binary_path: None,
1555                    version: None,
1556                    items_total: None,
1557                    items_pending: None,
1558                    llm_parallelism: None,
1559                });
1560                PathBuf::new()
1561            }
1562        })
1563    };
1564
1565    // G28-D: refuse to start when the system is saturated. This check
1566    // is BEFORE preflight so we never spend an OAuth turn on a host
1567    // that is already at the limit.
1568    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1569        let load = crate::system_load::load_average_one();
1570        let n = crate::system_load::ncpus();
1571        return Err(AppError::Validation(format!(
1572            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1573             pass --no-max-load-check to override (not recommended)"
1574        )));
1575    }
1576
1577    // G35: preflight probe — issue a single ping turn to verify the
1578    // provider is healthy before scanning N candidates. If the probe
1579    // fails with a rate-limit error, optionally fall back to a
1580    // different mode (typically codex) instead of failing the entire
1581    // batch. The probe itself consumes 1 OAuth turn, so it stays
1582    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1583    if args.preflight_check
1584        && !args.dry_run
1585        && !matches!(args.operation(), EnrichOperation::ReEmbed)
1586    {
1587        let preflight_result = run_preflight_probe(args);
1588        match preflight_result {
1589            PreflightOutcome::Healthy => {
1590                tracing::info!(target: "enrich", mode = ?args.mode(), "preflight probe healthy");
1591            }
1592            PreflightOutcome::RateLimited { reason, suggestion } => {
1593                if let Some(fallback) = args.fallback_mode.clone() {
1594                    if fallback != args.mode() {
1595                        // G35 (v1.0.69): the mid-batch mode switch is
1596                        // intentionally NOT applied because it would
1597                        // desynchronise the per-item rate-limit wait
1598                        // state (rate-limited items in the worker are
1599                        // timed against the original provider). Instead
1600                        // we abort cleanly so the operator can re-invoke
1601                        // with `--mode {fallback:?}`. This guarantees no
1602                        // OAuth window is wasted and no partial state
1603                        // is left in the queue.
1604                        return Err(AppError::Validation(format!(
1605                            "preflight detected rate limit on {mode:?}: {reason}; \
1606                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1607                            mode = args.mode()
1608                        )));
1609                    }
1610                    return Err(AppError::Validation(format!(
1611                        "preflight detected rate limit on {mode:?}: {reason}; \
1612                         --fallback-mode matches --mode, no recovery possible",
1613                        mode = args.mode()
1614                    )));
1615                }
1616                return Err(AppError::Validation(format!(
1617                    "preflight detected rate limit on {mode:?}: {reason}; \
1618                     {suggestion}; pass --fallback-mode codex to recover",
1619                    mode = args.mode()
1620                )));
1621            }
1622            PreflightOutcome::Error(e) => {
1623                return Err(e);
1624            }
1625        }
1626    }
1627
1628    // SCAN phase
1629    let mut scan_result = scan_operation(&conn, &namespace, args)?;
1630    // GAP-SG-69: body-enrich candidates are scanned purely by `LENGTH(body) <
1631    // min_output_chars`, so a short body whose rewrite the preservation guard
1632    // keeps rejecting is re-scanned every pass — items_total never reaches 0 and
1633    // `--until-empty` never converges (the detached worker reported a stuck
1634    // backlog for 30+ min). Exclude memories already vetoed `status='skipped'`
1635    // for this operation in the sidecar queue; `cleanup_queue_entry`
1636    // (remember/edit/forget/purge) clears the veto when the body actually
1637    // changes, so a genuinely updated memory is reconsidered automatically.
1638    if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1639        let q_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1640        if let Ok(q) = open_queue_db(&q_path) {
1641            if let Ok(vetoed) = skipped_item_keys(&q, &format!("{:?}", args.operation())) {
1642                scan_result.retain(|k| !vetoed.contains(k));
1643            }
1644        }
1645    }
1646    let total = scan_result.len();
1647
1648    emit_json(&PhaseEvent {
1649        phase: "scan",
1650        binary_path: None,
1651        version: None,
1652        items_total: Some(total),
1653        items_pending: Some(total),
1654        llm_parallelism: Some(args.llm_parallelism),
1655    });
1656
1657    // Dry-run: emit preview events and summary without calling LLM
1658    if args.dry_run {
1659        for (idx, key) in scan_result.iter().enumerate() {
1660            emit_json(&ItemEvent {
1661                item: key,
1662                status: "preview",
1663                memory_id: None,
1664                entity_id: None,
1665                entities: None,
1666                rels: None,
1667                chars_before: None,
1668                chars_after: None,
1669                cost_usd: None,
1670                elapsed_ms: None,
1671                error: None,
1672                index: idx,
1673                total,
1674            });
1675        }
1676        emit_json(&EnrichSummary {
1677            summary: true,
1678            operation: format!("{:?}", args.operation()),
1679            items_total: total,
1680            completed: 0,
1681            failed: 0,
1682            skipped: 0,
1683            cost_usd: 0.0,
1684            elapsed_ms: started.elapsed().as_millis() as u64,
1685            backend_invoked: take_enrich_backend(),
1686            waiting: 0,
1687            dead: 0,
1688        });
1689        return Ok(());
1690    }
1691
1692    // All operations in this enum have an execution path.
1693
1694    // Queue setup for resume/retry (GAP-SG-64: sidecar alongside --db)
1695    let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1696    let queue_conn = open_queue_db(&queue_path)?;
1697
1698    if args.resume {
1699        let reset = queue_conn
1700            .execute(
1701                "UPDATE queue SET status='pending' WHERE status='processing'",
1702                [],
1703            )
1704            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1705        if reset > 0 {
1706            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1707        }
1708    }
1709
1710    if args.retry_failed {
1711        let count = queue_conn
1712            .execute(
1713                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1714                [],
1715            )
1716            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1717        tracing::info!(target: "enrich", count, "retrying failed items");
1718    }
1719
1720    if !args.resume && !args.retry_failed && !args.until_empty {
1721        queue_conn
1722            .execute("DELETE FROM queue", [])
1723            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1724    }
1725
1726    // Populate queue (GAP-SG-12: tag rows with the operation + link memory_id).
1727    let op_label = format!("{:?}", args.operation());
1728    let item_type = item_type_for(&args.operation());
1729    for key in scan_result.iter() {
1730        enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1731    }
1732
1733    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1734    // Clamp enforces the range even if the caller bypasses clap validation.
1735    let parallelism = if args.mode() == EnrichMode::OpenRouter {
1736        let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
1737        tracing::info!(
1738            target: "enrich",
1739            concurrency = rest,
1740            source = "rest_concurrency",
1741            "OpenRouter REST concurrency (clamp 1..=16)"
1742        );
1743        rest
1744    } else {
1745        let p = args.llm_parallelism.clamp(1, 32) as usize;
1746        tracing::info!(
1747            target: "enrich",
1748            concurrency = p,
1749            source = "llm_parallelism",
1750            "LLM subprocess parallelism (clamp 1..=32)"
1751        );
1752        p
1753    };
1754    if parallelism > 1 {
1755        tracing::info!(
1756            target: "enrich",
1757            llm_parallelism = parallelism,
1758            "parallel LLM processing with bounded thread pool"
1759        );
1760    }
1761    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1762    // ceiling. The threshold and message depend on the LLM mode because
1763    // Claude Code spawns MCP children (G28-A) while Codex does not.
1764    if parallelism > 4 {
1765        match args.mode() {
1766            EnrichMode::ClaudeCode => {
1767                tracing::warn!(
1768                    target: "enrich",
1769                    llm_parallelism = parallelism,
1770                    recommended_max = 4,
1771                    mode = "claude-code",
1772                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1773                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1774                     to cut MCP children (G28-A)"
1775                );
1776            }
1777            EnrichMode::Codex if parallelism > 16 => {
1778                tracing::warn!(
1779                    target: "enrich",
1780                    llm_parallelism = parallelism,
1781                    recommended_max = 16,
1782                    mode = "codex",
1783                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1784                     consider --llm-parallelism 8 for safer concurrency"
1785                );
1786            }
1787            EnrichMode::Codex => {
1788                // No warning: codex does not spawn MCP children and was
1789                // validated at parallelism 8 in production (1161 items,
1790                // 0 failures) per the 2026-06-04 session audit.
1791            }
1792            EnrichMode::Opencode if parallelism > 16 => {
1793                tracing::warn!(
1794                    target: "enrich",
1795                    llm_parallelism = parallelism,
1796                    recommended_max = 16,
1797                    mode = "opencode",
1798                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1799                     consider --llm-parallelism 8 for safer concurrency"
1800                );
1801            }
1802            EnrichMode::Opencode => {
1803                // No warning: opencode does not spawn MCP children.
1804            }
1805            EnrichMode::OpenRouter => {
1806                // No warning: OpenRouter is a bounded HTTP fan-out (no
1807                // subprocess); --llm-parallelism is respected as-is.
1808            }
1809        }
1810    }
1811
1812    let mut completed = 0usize;
1813    let mut failed = 0usize;
1814    let mut skipped = 0usize;
1815    let mut cost_total = 0.0f64;
1816    let mut oauth_detected = false;
1817    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1818    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1819    let enrich_started = std::time::Instant::now();
1820
1821    let provider_timeout = match args.mode() {
1822        EnrichMode::ClaudeCode => args.claude_timeout,
1823        EnrichMode::Codex => args.codex_timeout,
1824        EnrichMode::Opencode => args.opencode_timeout,
1825        EnrichMode::OpenRouter => args.openrouter_timeout,
1826    };
1827
1828    let provider_model: Option<&str> = match args.mode() {
1829        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1830        EnrichMode::Codex => args.codex_model.as_deref(),
1831        EnrichMode::Opencode => args.opencode_model.as_deref(),
1832        EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
1833    };
1834
1835    // GAP-SG-16: when --ignore-backoff is set, drop the per-item cooldown filter
1836    // from candidate selection so items parked on `next_retry_at` are eligible
1837    // immediately. Shared by the parallel workers and the serial loop.
1838    let backoff_clause: &str = if args.ignore_backoff {
1839        ""
1840    } else {
1841        "AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))"
1842    };
1843
1844    // GAP-SG-45: announce the scan-vs-drain concurrency split (scan is always
1845    // serial; drain uses `parallelism` workers).
1846    emit_json(&ConcurrencyEvent {
1847        phase: "concurrency",
1848        scan_parallelism: 1,
1849        drain_parallelism: parallelism as u32,
1850    });
1851
1852    // GAP-ENRICH-BACKLOG-CONVERGE: --until-empty wraps the scan→populate→drain
1853    // cycle in an internal loop so the external bash retry loop is unnecessary.
1854    // Without --until-empty the loop body runs exactly once (legacy behaviour).
1855    let until_deadline = std::time::Instant::now()
1856        + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
1857    loop {
1858        if args.until_empty {
1859            // Re-scan and re-enqueue eligible candidates each iteration.
1860            // INSERT OR IGNORE never resurrects a dead-letter row (item_key is
1861            // UNIQUE), so the backlog converges instead of looping forever.
1862            let mut rescan = scan_operation(&conn, &namespace, args)?;
1863            // GAP-SG-69: drop memories already vetoed `status='skipped'` so the
1864            // re-scan converges instead of re-enqueuing a non-expandable short
1865            // body every iteration (body-enrich only; the verdict persists in
1866            // the sidecar queue and is cleared by cleanup_queue_entry on edit).
1867            if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1868                if let Ok(vetoed) = skipped_item_keys(&queue_conn, &op_label) {
1869                    rescan.retain(|k| !vetoed.contains(k));
1870                }
1871            }
1872            for key in &rescan {
1873                enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1874            }
1875        }
1876        let completed_before = completed;
1877
1878        // G19: when parallelism > 1, spawn bounded worker threads.
1879        // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1880        // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1881        if parallelism > 1 {
1882            let stdout_mu = parking_lot::Mutex::new(());
1883            let budget = args.max_cost_usd;
1884            let operation = args.operation().clone();
1885            let mode = args.mode().clone();
1886            let min_oc = args.min_output_chars;
1887            let max_oc = args.max_output_chars;
1888            let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1889
1890            struct WorkerResult {
1891                completed: usize,
1892                failed: usize,
1893                skipped: usize,
1894                cost: f64,
1895                oauth: bool,
1896                // GAP-SG-76 fix: distinct signal for "worker aborted because
1897                // SQLITE_BUSY exhausted all bounded retries" so the caller
1898                // fails loud (exit 15) instead of silently treating it like
1899                // an exhausted/empty backlog.
1900                db_busy: bool,
1901            }
1902
1903            let results: Vec<WorkerResult> = std::thread::scope(|s| {
1904                let handles: Vec<_> = (0..parallelism)
1905                .map(|worker_id| {
1906                    let stdout_mu = &stdout_mu;
1907                    let paths = &paths;
1908                    let queue_path = &queue_path;
1909                    let namespace = &namespace;
1910                    let provider_binary = provider_binary.as_deref();
1911                    let operation = &operation;
1912                    let mode = &mode;
1913                    let prompt_tpl = prompt_tpl.as_deref();
1914                    s.spawn(move || {
1915                        let w_conn = match open_rw(&paths.db) {
1916                            Ok(c) => c,
1917                            Err(e) => {
1918                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1919                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1920                            }
1921                        };
1922                        let w_queue = match open_queue_db(queue_path) {
1923                            Ok(c) => c,
1924                            Err(e) => {
1925                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1926                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1927                            }
1928                        };
1929                        let mut w_completed = 0usize;
1930                        let mut w_failed = 0usize;
1931                        let mut w_skipped = 0usize;
1932                        let mut w_cost = 0.0f64;
1933                        let mut w_oauth = false;
1934                        let mut w_db_busy = false;
1935                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1936                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1937                        // G28-D: per-worker circuit breaker that aborts the
1938                        // loop after `circuit_breaker_threshold` consecutive
1939                        // HardFailure outcomes (transient/rate-limited errors
1940                        // do NOT count, so a recovering provider is not
1941                        // penalised).
1942                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1943                            args.circuit_breaker_threshold.max(1),
1944                            std::time::Duration::from_secs(60),
1945                        );
1946
1947                        loop {
1948                            if crate::shutdown_requested() {
1949                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1950                                break;
1951                            }
1952                            if let Some(b) = budget {
1953                                if !w_oauth && w_cost >= b {
1954                                    break;
1955                                }
1956                            }
1957                            // GAP-SG-16: --ignore-backoff drops the next_retry_at
1958                            // cooldown filter so items waiting on backoff are
1959                            // claimed immediately.
1960                            // GAP-SG-76: distinguish a genuinely empty backlog
1961                            // (QueryReturnedNoRows) from SQLITE_BUSY lock
1962                            // contention with the main writer or another
1963                            // worker — a busy claim retries briefly instead of
1964                            // breaking the drain loop early.
1965                            // GAP-SG-76/v1.1.00 fix: bounded busy-retry via the
1966                            // shared with_busy_retry helper (5 attempts, exponential
1967                            // half-jitter backoff, kill-switch aware) instead of an
1968                            // unbounded `loop { ... continue; }` on SQLITE_BUSY. When
1969                            // retries are exhausted, with_busy_retry converts to
1970                            // AppError::DbBusy — that is NOT treated as an empty
1971                            // backlog. It sets w_db_busy and stops this worker; the
1972                            // caller (after collecting all workers) fails loud with
1973                            // exit code 15 instead of silently under-reporting a
1974                            // convergent drain.
1975                            let pending = match crate::storage::utils::with_busy_retry(|| {
1976                                dequeue_next_pending(&w_queue, backoff_clause)
1977                            }) {
1978                                Ok(DequeueOutcome::Claimed(p)) => Some(p),
1979                                Ok(DequeueOutcome::Empty) => None,
1980                                Err(AppError::DbBusy(msg)) => {
1981                                    tracing::error!(target: "enrich", worker = worker_id, error = %msg, "SQLITE_BUSY exhausted bounded retries, worker aborting");
1982                                    w_db_busy = true;
1983                                    None
1984                                }
1985                                Err(e) => {
1986                                    tracing::error!(target: "enrich", worker = worker_id, error = %e, "dequeue failed");
1987                                    None
1988                                }
1989                            };
1990                            let (queue_id, item_key, _item_type, attempt_current) = match pending {
1991                                Some(p) => p,
1992                                None => break,
1993                            };
1994                            let item_started = Instant::now();
1995                            let current_index = w_completed + w_failed + w_skipped;
1996
1997                            // provider_binary validated upfront (Some for every
1998                            // LLM-backed op; None only for ReEmbed, which ignores
1999                            // it). unwrap_or_default yields "" solely in that
2000                            // unread case, so a broken invariant surfaces as a
2001                            // recoverable per-item error instead of a panic.
2002                            let provider_bin = provider_binary.unwrap_or_else(|| std::path::Path::new(""));
2003                            let call_result = match operation {
2004                                EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2005                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2006                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2007                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2008                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2009                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2010                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2011                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2012                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2013                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2014                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2015                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2016                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, args.body_extract_graph_only),
2017                            };
2018                            // GAP-SG-72/73: drain UNCONDITIONALLY right after
2019                            // every call_result (success or failure) so a
2020                            // diagnostic never survives past the item that
2021                            // produced it — see the doc comment on
2022                            // OpenRouterFailureDiagnostics.
2023                            let openrouter_diag = take_last_openrouter_failure();
2024
2025                            match call_result {
2026                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2027                                    if is_oauth { w_oauth = true; }
2028                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2029                                    let _ = w_queue.execute(
2030                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2031                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2032                                    );
2033                                    w_completed += 1;
2034                                    if !is_oauth { w_cost += cost; }
2035                                    // G28-D: count success; resets breaker.
2036                                    let _ = w_breaker
2037                                        .record(crate::retry::AttemptOutcome::Success);
2038                                    let _guard = stdout_mu.lock();
2039                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2040                                }
2041                                Ok(EnrichItemResult::Skipped { reason }) => {
2042                                    w_skipped += 1;
2043                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2044                                    let _guard = stdout_mu.lock();
2045                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2046                                }
2047                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2048                                    // G29 Passo 4: worker mirror of the
2049                                    // serial path. Counted as a soft
2050                                    // skip so the queue surface shows
2051                                    // a quality issue rather than a
2052                                    // transport failure.
2053                                    w_skipped += 1;
2054                                    let reason = format!(
2055                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2056                                    );
2057                                    let _ = w_queue.execute(
2058                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2059                                        rusqlite::params![reason, queue_id],
2060                                    );
2061                                    let _guard = stdout_mu.lock();
2062                                    emit_json(&ItemEvent {
2063                                        item: &item_key,
2064                                        status: "preservation_failed",
2065                                        memory_id: None,
2066                                        entity_id: None,
2067                                        entities: None,
2068                                        rels: None,
2069                                        chars_before: Some(chars_before),
2070                                        chars_after: Some(chars_after),
2071                                        cost_usd: None,
2072                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2073                                        error: Some(reason),
2074                                        index: current_index,
2075                                        total,
2076                                    });
2077                                }
2078                                Err(e) => {
2079                                    let err_str = format!("{e}");
2080                                    if matches!(e, AppError::RateLimited { .. }) {
2081                                        if crate::retry::is_kill_switch_active() {
2082                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2083                                        } else if std::time::Instant::now() >= w_deadline {
2084                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2085                                        } else {
2086                                            let half = w_backoff / 2;
2087                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2088                                            let actual_wait = half + jitter;
2089                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2090                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2091                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2092                                            w_backoff = (w_backoff * 2).min(900);
2093                                            continue;
2094                                        }
2095                                    }
2096                                    w_failed += 1;
2097                                    // GAP-SG-73: prefer the origin-typed verdict
2098                                    // (ChatError::retry_class, computed at the
2099                                    // exact HTTP status / provider code in
2100                                    // chat_api.rs) over the untyped fallback
2101                                    // classifier whenever this item's failure
2102                                    // came from an OpenRouter chat call.
2103                                    let outcome = match openrouter_diag {
2104                                        Some(diag) => record_item_failure_typed(
2105                                            &w_queue,
2106                                            queue_id,
2107                                            attempt_current,
2108                                            args.max_attempts,
2109                                            diag.retry_class,
2110                                            &err_str,
2111                                            diag.finish_reason.as_deref(),
2112                                            diag.prompt_tokens,
2113                                            diag.completion_tokens,
2114                                        ),
2115                                        None => record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e),
2116                                    };
2117                                    let _guard = stdout_mu.lock();
2118                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2119                                    // G28-D: feed the classified outcome to the breaker (transient
2120                                    // failures do not count toward opening it).
2121                                    let breaker_opened = w_breaker.record(outcome);
2122                                    if breaker_opened {
2123                                        tracing::error!(target: "enrich",
2124                                            consecutive_failures = w_breaker.consecutive_failures(),
2125                                            "circuit breaker opened — aborting worker"
2126                                        );
2127                                        break;
2128                                    }
2129                                }
2130                            }
2131                        }
2132                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth, db_busy: w_db_busy }
2133                    })
2134                })
2135                .collect();
2136                handles
2137                    .into_iter()
2138                    .map(|h| {
2139                        h.join().unwrap_or(WorkerResult {
2140                            completed: 0,
2141                            failed: 0,
2142                            skipped: 0,
2143                            cost: 0.0,
2144                            oauth: false,
2145                            db_busy: false,
2146                        })
2147                    })
2148                    .collect()
2149            });
2150
2151            // GAP-SG-76 fix: a worker that aborted due to exhausted
2152            // SQLITE_BUSY retries must fail the whole `enrich` invocation
2153            // loudly (exit code 15 via AppError::DbBusy) rather than being
2154            // folded silently into the completed/failed/skipped counters,
2155            // which would understate a genuinely unfinished drain.
2156            if results.iter().any(|r| r.db_busy) {
2157                return Err(AppError::DbBusy(
2158                    "SQLITE_BUSY exhausted bounded retries while dequeuing (parallel worker)"
2159                        .into(),
2160                ));
2161            }
2162
2163            for r in &results {
2164                completed += r.completed;
2165                failed += r.failed;
2166                skipped += r.skipped;
2167                cost_total += r.cost;
2168                if r.oauth && !oauth_detected {
2169                    oauth_detected = true;
2170                }
2171            }
2172        } else {
2173            // Serial path (parallelism == 1) — original loop
2174            loop {
2175                if crate::shutdown_requested() {
2176                    tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2177                    break;
2178                }
2179
2180                // Budget check
2181                if let Some(budget) = args.max_cost_usd {
2182                    if !oauth_detected && cost_total >= budget {
2183                        tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2184                        break;
2185                    }
2186                }
2187
2188                // Dequeue next pending item (GAP-SG-16: --ignore-backoff drops
2189                // the next_retry_at cooldown filter).
2190                // GAP-SG-76: distinguish a genuinely empty backlog
2191                // (QueryReturnedNoRows) from SQLITE_BUSY lock contention with
2192                // a concurrent writer — a busy claim retries briefly instead
2193                // of breaking the drain loop early.
2194                // GAP-SG-76/v1.1.00 fix: bounded busy-retry via the shared
2195                // with_busy_retry helper (5 attempts, exponential half-jitter
2196                // backoff, kill-switch aware) instead of an unbounded
2197                // `loop { ... continue; }` on SQLITE_BUSY. When retries are
2198                // exhausted, with_busy_retry converts to AppError::DbBusy,
2199                // which we propagate immediately (fail loud, exit code 15)
2200                // instead of silently treating sustained contention as an
2201                // empty backlog — that would end the drain early and under-
2202                // report queue_pending as converged.
2203                let pending = match crate::storage::utils::with_busy_retry(|| {
2204                    dequeue_next_pending(&queue_conn, backoff_clause)
2205                }) {
2206                    Ok(DequeueOutcome::Claimed(p)) => Some(p),
2207                    Ok(DequeueOutcome::Empty) => None,
2208                    Err(e @ AppError::DbBusy(_)) => {
2209                        tracing::error!(target: "enrich", error = %e, "SQLITE_BUSY exhausted bounded retries, aborting drain loop");
2210                        return Err(e);
2211                    }
2212                    Err(e) => {
2213                        tracing::error!(target: "enrich", error = %e, "dequeue failed");
2214                        None
2215                    }
2216                };
2217
2218                let (queue_id, item_key, item_type, attempt_current) = match pending {
2219                    Some(p) => p,
2220                    None => break,
2221                };
2222
2223                let item_started = Instant::now();
2224                let current_index = completed + failed + skipped;
2225
2226                // See worker note: provider_binary is Some for every LLM-backed
2227                // op; "" here only for ReEmbed, which never reads it.
2228                let provider_bin = provider_binary
2229                    .as_deref()
2230                    .unwrap_or_else(|| std::path::Path::new(""));
2231                let call_result = match args.operation() {
2232                    EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => {
2233                        call_memory_bindings(
2234                            &conn,
2235                            &namespace,
2236                            &item_key,
2237                            provider_bin,
2238                            provider_model,
2239                            provider_timeout,
2240                            &args.mode(),
2241                        )
2242                    }
2243                    EnrichOperation::EntityDescriptions => call_entity_description(
2244                        &conn,
2245                        &namespace,
2246                        &item_key,
2247                        provider_bin,
2248                        provider_model,
2249                        provider_timeout,
2250                        &args.mode(),
2251                    ),
2252                    EnrichOperation::BodyEnrich => call_body_enrich(
2253                        &conn,
2254                        &namespace,
2255                        &item_key,
2256                        provider_bin,
2257                        provider_model,
2258                        provider_timeout,
2259                        &args.mode(),
2260                        args.min_output_chars,
2261                        args.max_output_chars,
2262                        args.prompt_template.as_deref(),
2263                        args.preserve_threshold,
2264                        &paths,
2265                        llm_backend,
2266                        embedding_backend,
2267                    ),
2268                    EnrichOperation::ReEmbed => call_reembed(
2269                        &conn,
2270                        &namespace,
2271                        &item_key,
2272                        &paths,
2273                        llm_backend,
2274                        embedding_backend,
2275                    ),
2276                    EnrichOperation::WeightCalibrate => call_weight_calibrate(
2277                        &conn,
2278                        &namespace,
2279                        &item_key,
2280                        provider_bin,
2281                        provider_model,
2282                        provider_timeout,
2283                        &args.mode(),
2284                    ),
2285                    EnrichOperation::RelationReclassify => call_relation_reclassify(
2286                        &conn,
2287                        &namespace,
2288                        &item_key,
2289                        provider_bin,
2290                        provider_model,
2291                        provider_timeout,
2292                        &args.mode(),
2293                    ),
2294                    EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2295                        call_entity_connect(
2296                            &conn,
2297                            &namespace,
2298                            &item_key,
2299                            provider_bin,
2300                            provider_model,
2301                            provider_timeout,
2302                            &args.mode(),
2303                        )
2304                    }
2305                    EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2306                        &conn,
2307                        &namespace,
2308                        &item_key,
2309                        provider_bin,
2310                        provider_model,
2311                        provider_timeout,
2312                        &args.mode(),
2313                    ),
2314                    EnrichOperation::DescriptionEnrich => call_description_enrich(
2315                        &conn,
2316                        &namespace,
2317                        &item_key,
2318                        provider_bin,
2319                        provider_model,
2320                        provider_timeout,
2321                        &args.mode(),
2322                    ),
2323                    EnrichOperation::DomainClassify => call_domain_classify(
2324                        &conn,
2325                        &namespace,
2326                        &item_key,
2327                        provider_bin,
2328                        provider_model,
2329                        provider_timeout,
2330                        &args.mode(),
2331                    ),
2332                    EnrichOperation::GraphAudit => call_graph_audit(
2333                        &conn,
2334                        &namespace,
2335                        &item_key,
2336                        provider_bin,
2337                        provider_model,
2338                        provider_timeout,
2339                        &args.mode(),
2340                    ),
2341                    EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2342                        &conn,
2343                        &namespace,
2344                        &item_key,
2345                        provider_bin,
2346                        provider_model,
2347                        provider_timeout,
2348                        &args.mode(),
2349                    ),
2350                    EnrichOperation::BodyExtract => call_body_extract(
2351                        &conn,
2352                        &namespace,
2353                        &item_key,
2354                        provider_bin,
2355                        provider_model,
2356                        provider_timeout,
2357                        &args.mode(),
2358                        args.body_extract_graph_only,
2359                    ),
2360                };
2361                // GAP-SG-72/73: drain UNCONDITIONALLY right after every
2362                // call_result (mirrors the worker loop above).
2363                let openrouter_diag = take_last_openrouter_failure();
2364
2365                match call_result {
2366                    Ok(EnrichItemResult::Done {
2367                        memory_id,
2368                        entity_id,
2369                        entities,
2370                        rels,
2371                        chars_before,
2372                        chars_after,
2373                        cost,
2374                        is_oauth,
2375                    }) => {
2376                        if is_oauth && !oauth_detected {
2377                            oauth_detected = true;
2378                            tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2379                        }
2380                        backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2381
2382                        // Persist depends on the operation
2383                        let persist_err: Option<String> = match args.operation() {
2384                            EnrichOperation::MemoryBindings => {
2385                                // Bindings already persisted inside call_memory_bindings
2386                                None
2387                            }
2388                            EnrichOperation::EntityDescriptions => {
2389                                // Description already persisted inside call_entity_description
2390                                None
2391                            }
2392                            EnrichOperation::BodyEnrich => {
2393                                // Body already persisted inside call_body_enrich
2394                                None
2395                            }
2396                            _ => {
2397                                // All G27 operations persist inside their call_* function
2398                                None
2399                            }
2400                        };
2401
2402                        if let Err(e) = queue_conn.execute(
2403                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2404                    rusqlite::params![
2405                        memory_id,
2406                        entity_id,
2407                        entities as i64,
2408                        rels as i64,
2409                        cost,
2410                        item_started.elapsed().as_millis() as i64,
2411                        queue_id
2412                    ],
2413                ) {
2414                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2415                    }
2416
2417                        if persist_err.is_none() {
2418                            completed += 1;
2419                            if !is_oauth {
2420                                cost_total += cost;
2421                            }
2422                            emit_json(&ItemEvent {
2423                                item: &item_key,
2424                                status: "done",
2425                                memory_id,
2426                                entity_id,
2427                                entities: Some(entities),
2428                                rels: Some(rels),
2429                                chars_before,
2430                                chars_after,
2431                                cost_usd: if is_oauth { None } else { Some(cost) },
2432                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2433                                error: None,
2434                                index: current_index,
2435                                total,
2436                            });
2437                        } else {
2438                            failed += 1;
2439                            emit_json(&ItemEvent {
2440                                item: &item_key,
2441                                status: "failed",
2442                                memory_id: None,
2443                                entity_id: None,
2444                                entities: None,
2445                                rels: None,
2446                                chars_before: None,
2447                                chars_after: None,
2448                                cost_usd: None,
2449                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2450                                error: persist_err,
2451                                index: current_index,
2452                                total,
2453                            });
2454                        }
2455                    }
2456                    Ok(EnrichItemResult::Skipped { reason }) => {
2457                        skipped += 1;
2458                        if let Err(e) = queue_conn.execute(
2459                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2460                    rusqlite::params![reason, queue_id],
2461                ) {
2462                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2463                    }
2464                        emit_json(&ItemEvent {
2465                            item: &item_key,
2466                            status: "skipped",
2467                            memory_id: None,
2468                            entity_id: None,
2469                            entities: None,
2470                            rels: None,
2471                            chars_before: None,
2472                            chars_after: None,
2473                            cost_usd: None,
2474                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2475                            error: None,
2476                            index: current_index,
2477                            total,
2478                        });
2479                    }
2480                    Ok(EnrichItemResult::PreservationFailed {
2481                        score,
2482                        threshold,
2483                        chars_before,
2484                        chars_after,
2485                    }) => {
2486                        // G29 Passo 4: the LLM rewrite diverged too far from
2487                        // the original body. Count as a soft failure (not
2488                        // `failed`) so the queue surfaces it as a quality
2489                        // issue, not a transport error. The reason is
2490                        // structured so the operator can audit why a body
2491                        // was rejected.
2492                        skipped += 1;
2493                        let reason = format!(
2494                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2495                    );
2496                        if let Err(qe) = queue_conn.execute(
2497                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2498                        rusqlite::params![reason, queue_id],
2499                    ) {
2500                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2501                    }
2502                        emit_json(&ItemEvent {
2503                            item: &item_key,
2504                            status: "preservation_failed",
2505                            memory_id: None,
2506                            entity_id: None,
2507                            entities: None,
2508                            rels: None,
2509                            chars_before: Some(chars_before),
2510                            chars_after: Some(chars_after),
2511                            cost_usd: None,
2512                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2513                            error: Some(reason),
2514                            index: current_index,
2515                            total,
2516                        });
2517                    }
2518                    Err(e) => {
2519                        let err_str = format!("{e}");
2520                        if matches!(e, AppError::RateLimited { .. }) {
2521                            if crate::retry::is_kill_switch_active() {
2522                                tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2523                            } else if std::time::Instant::now() >= rate_limit_deadline {
2524                                tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2525                            } else {
2526                                let half = backoff_secs / 2;
2527                                let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2528                                let actual_wait = half + jitter;
2529                                tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2530                                if let Err(qe) = queue_conn.execute(
2531                                    "UPDATE queue SET status='pending' WHERE id=?1",
2532                                    rusqlite::params![queue_id],
2533                                ) {
2534                                    tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2535                                }
2536                                std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2537                                backoff_secs = (backoff_secs * 2).min(900);
2538                                continue;
2539                            }
2540                        }
2541
2542                        failed += 1;
2543                        // GAP-SG-73: prefer the origin-typed verdict
2544                        // (ChatError::retry_class) over the untyped fallback
2545                        // classifier whenever this item's failure came from
2546                        // an OpenRouter chat call.
2547                        let _outcome = match openrouter_diag {
2548                            Some(diag) => record_item_failure_typed(
2549                                &queue_conn,
2550                                queue_id,
2551                                attempt_current,
2552                                args.max_attempts,
2553                                diag.retry_class,
2554                                &err_str,
2555                                diag.finish_reason.as_deref(),
2556                                diag.prompt_tokens,
2557                                diag.completion_tokens,
2558                            ),
2559                            None => record_item_failure(
2560                                &queue_conn,
2561                                queue_id,
2562                                attempt_current,
2563                                args.max_attempts,
2564                                &e,
2565                            ),
2566                        };
2567                        emit_json(&ItemEvent {
2568                            item: &item_key,
2569                            status: "failed",
2570                            memory_id: None,
2571                            entity_id: None,
2572                            entities: None,
2573                            rels: None,
2574                            chars_before: None,
2575                            chars_after: None,
2576                            cost_usd: None,
2577                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2578                            error: Some(err_str),
2579                            index: current_index,
2580                            total,
2581                        });
2582                    }
2583                }
2584
2585                let _ = item_type; // used via queue schema only
2586            }
2587        } // end else (serial path)
2588
2589        if !args.until_empty {
2590            break;
2591        }
2592        let eligible_remaining: i64 = queue_conn
2593            .query_row(
2594                &format!("SELECT COUNT(*) FROM queue WHERE status='pending' {backoff_clause}"),
2595                [],
2596                |r| r.get(0),
2597            )
2598            .unwrap_or(0);
2599        let progressed = completed > completed_before;
2600        if std::time::Instant::now() >= until_deadline {
2601            tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
2602            break;
2603        }
2604        if !progressed && eligible_remaining == 0 {
2605            tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
2606            break;
2607        }
2608        if eligible_remaining == 0 {
2609            // Remaining pending items are waiting on backoff; nap and re-check.
2610            std::thread::sleep(std::time::Duration::from_secs(1));
2611        }
2612    } // end until-empty loop
2613
2614    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2615    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2616
2617    // GAP-SG-15: report items still in cooldown (waiting) and dead-lettered
2618    // alongside completed, so `--until-empty` makes the convergence state
2619    // explicit (cooldown vs. dead vs. truly empty) instead of just "done".
2620    let waiting_final: i64 = queue_conn
2621        .query_row(
2622            "SELECT COUNT(*) FROM queue WHERE status='pending' \
2623             AND (operation = ?1 OR operation IS NULL) \
2624             AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
2625            rusqlite::params![op_label],
2626            |r| r.get(0),
2627        )
2628        .unwrap_or(0);
2629    let dead_final: i64 = queue_conn
2630        .query_row(
2631            "SELECT COUNT(*) FROM queue WHERE status='dead' \
2632             AND (operation = ?1 OR operation IS NULL)",
2633            rusqlite::params![op_label],
2634            |r| r.get(0),
2635        )
2636        .unwrap_or(0);
2637
2638    emit_json(&EnrichSummary {
2639        summary: true,
2640        operation: format!("{:?}", args.operation()),
2641        items_total: total,
2642        completed,
2643        failed,
2644        skipped,
2645        cost_usd: cost_total,
2646        elapsed_ms: started.elapsed().as_millis() as u64,
2647        backend_invoked: take_enrich_backend(),
2648        waiting: waiting_final,
2649        dead: dead_final,
2650    });
2651
2652    if failed == 0 {
2653        // GAP-ENRICH-BACKLOG-CONVERGE: keep the queue file when dead-letter rows
2654        // exist so `enrich --status` can still report them on the next run.
2655        let dead: i64 = queue_conn
2656            .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
2657                r.get(0)
2658            })
2659            .unwrap_or(0);
2660        // GAP-SG-69: keep the sidecar queue while it still holds `skipped`
2661        // verdicts. Those rows tell the next scan which short bodies are
2662        // non-expandable; removing the file would lose the veto and the
2663        // body-enrich backlog would never converge. cleanup_queue_entry clears
2664        // a row when its memory is edited/forgotten, so the veto is not permanent.
2665        let skipped_remaining: i64 = queue_conn
2666            .query_row(
2667                "SELECT COUNT(*) FROM queue WHERE status='skipped'",
2668                [],
2669                |r| r.get(0),
2670            )
2671            .unwrap_or(0);
2672        if dead == 0 && skipped_remaining == 0 {
2673            let _ = std::fs::remove_file(&queue_path);
2674        }
2675    }
2676
2677    Ok(())
2678}
2679
2680// EnrichItemResult + call_* functions moved to extraction.rs
2681
2682// ---------------------------------------------------------------------------
2683// Tests
2684// ---------------------------------------------------------------------------
2685
2686#[cfg(test)]
2687mod tests {
2688    use super::*;
2689
2690    #[test]
2691    fn bindings_schema_is_valid_json() {
2692        let _: serde_json::Value =
2693            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2694    }
2695
2696    #[test]
2697    fn entity_description_schema_is_valid_json() {
2698        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2699            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2700    }
2701
2702    #[test]
2703    fn body_enrich_schema_is_valid_json() {
2704        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2705            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2706    }
2707}