Skip to main content

sqlite_graphrag/commands/enrich/
mod.rs

1// v1.0.97: modularised into queue.rs, scan.rs, postprocess.rs, extraction.rs.
2// See ADR-0056 (closes the ADR-0046 "Known Tech Debt (v1.0.89+)" item).
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB derived next to `--db` (GAP-SG-64) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY note
19//!
20//! v1.0.97: `claude_runner.rs` now hosts the shared Claude invocation helpers
21//! (`run_claude`, `parse_claude_output`, `spawn_with_memory_limit`). The queue
22//! DB schema in `ingest_claude.rs` still duplicates `open_queue_db` here — a
23//! future pass can unify them.
24
25mod extraction;
26mod postprocess;
27mod queue;
28mod scan;
29use extraction::{
30    call_body_enrich, call_body_extract, call_deep_research_synth, call_description_enrich,
31    call_domain_classify, call_entity_connect, call_entity_description, call_entity_type_validate,
32    call_graph_audit, call_memory_bindings, call_reembed, call_relation_reclassify,
33    call_weight_calibrate, find_codex_binary, EnrichItemResult,
34};
35use postprocess::{
36    persist_enriched_body, persist_entity_description, persist_memory_bindings,
37    reembed_memory_vector, take_enrich_backend,
38};
39pub use queue::{cleanup_queue_entry, DeadItem, DeadSummary, EnrichStatus, WaitingItem};
40use queue::{
41    enqueue_candidate, item_type_for, open_queue_db, prune_dead_orphans, record_item_failure,
42    skipped_item_keys,
43};
44use scan::{scan_isolated_entity_pairs, scan_operation, scan_unbound_memories};
45
46use crate::commands::ingest_claude::find_claude_binary;
47use crate::constants::MAX_MEMORY_BODY_LEN;
48use crate::entity_type::EntityType;
49use crate::errors::AppError;
50use crate::paths::AppPaths;
51use crate::storage::connection::{ensure_db_ready, open_rw};
52use crate::storage::entities::{self, NewEntity, NewRelationship};
53use crate::storage::memories;
54
55use rusqlite::Connection;
56use serde::{Deserialize, Serialize};
57use std::io::Write;
58use std::path::{Path, PathBuf};
59use std::time::Instant;
60
61// ---------------------------------------------------------------------------
62// Constants
63// ---------------------------------------------------------------------------
64
65const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
66const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
67const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
68
69// ---------------------------------------------------------------------------
70// JSON schema used for memory-bindings and body-enrich extraction
71// ---------------------------------------------------------------------------
72
73const BINDINGS_SCHEMA: &str = r#"{
74  "type": "object",
75  "properties": {
76    "entities": {
77      "type": "array",
78      "items": {
79        "type": "object",
80        "properties": {
81          "name": { "type": "string" },
82          "entity_type": {
83            "type": "string",
84            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
85          }
86        },
87        "required": ["name", "entity_type"],
88        "additionalProperties": false
89      }
90    },
91    "relationships": {
92      "type": "array",
93      "items": {
94        "type": "object",
95        "properties": {
96          "source": { "type": "string" },
97          "target": { "type": "string" },
98          "relation": {
99            "type": "string",
100            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
101          },
102          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
103        },
104        "required": ["source","target","relation","strength"],
105        "additionalProperties": false
106      }
107    }
108  },
109  "required": ["entities","relationships"],
110  "additionalProperties": false
111}"#;
112
113const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
114  "type": "object",
115  "properties": {
116    "description": { "type": "string" }
117  },
118  "required": ["description"],
119  "additionalProperties": false
120}"#;
121
122const BODY_ENRICH_SCHEMA: &str = r#"{
123  "type": "object",
124  "properties": {
125    "enriched_body": { "type": "string" }
126  },
127  "required": ["enriched_body"],
128  "additionalProperties": false
129}"#;
130
131// G27 P1: weight-calibrate
132const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
133Scale:\n\
134- 0.9 = vital hard dependency (A cannot function without B)\n\
135- 0.7 = important design relationship (A strongly supports/enables B)\n\
136- 0.5 = useful contextual link (A and B share relevant context)\n\
137- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
138Respond with the calibrated weight and brief reasoning.";
139
140const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
141  "type": "object",
142  "properties": {
143    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
144    "reasoning": { "type": "string" }
145  },
146  "required": ["calibrated_weight", "reasoning"],
147  "additionalProperties": false
148}"#;
149
150// G27 P1: relation-reclassify
151const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
152Valid canonical relations (pick exactly one):\n\
153- depends-on: A cannot function without B\n\
154- uses: A utilizes B but could substitute it\n\
155- supports: A reinforces or enables B\n\
156- causes: A triggers or produces B\n\
157- fixes: A resolves a problem in B\n\
158- contradicts: A conflicts with or invalidates B\n\
159- applies-to: A is relevant to or scoped within B\n\
160- follows: A comes after B in sequence\n\
161- replaces: A substitutes B\n\
162- tracked-in: A is monitored in B\n\
163- related: A and B share context (use sparingly)\n\n\
164Respond with the correct relation, strength, and reasoning.";
165
166const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
167  "type": "object",
168  "properties": {
169    "relation": { "type": "string" },
170    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
171    "reasoning": { "type": "string" }
172  },
173  "required": ["relation", "strength", "reasoning"],
174  "additionalProperties": false
175}"#;
176
177// G27 P2: entity-connect — suggest relationships between isolated entities
178const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
179Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
180If NO meaningful relationship exists, set relation to \"none\".\n\
181Respond with the relation (or \"none\"), strength, and reasoning.";
182
183const ENTITY_CONNECT_SCHEMA: &str = r#"{
184  "type": "object",
185  "properties": {
186    "relation": { "type": "string" },
187    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
188    "reasoning": { "type": "string" }
189  },
190  "required": ["relation", "strength", "reasoning"],
191  "additionalProperties": false
192}"#;
193
194// G27 P2: entity-type-validate — verify entity type assignments
195const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
196Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
197If the current type is correct, keep it. If wrong, suggest the correct type.\n\
198Respond with the validated type and reasoning.";
199
200const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
201  "type": "object",
202  "properties": {
203    "validated_type": { "type": "string" },
204    "was_correct": { "type": "boolean" },
205    "reasoning": { "type": "string" }
206  },
207  "required": ["validated_type", "was_correct", "reasoning"],
208  "additionalProperties": false
209}"#;
210
211// G27 P2: description-enrich — improve generic memory descriptions
212const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
213BAD: 'ingested from docs/auth.md'\n\
214GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
215Respond with the improved description and reasoning.";
216
217const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
218  "type": "object",
219  "properties": {
220    "description": { "type": "string" },
221    "reasoning": { "type": "string" }
222  },
223  "required": ["description", "reasoning"],
224  "additionalProperties": false
225}"#;
226
227// G27 P2: domain-classify — classify memory into domain category
228const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
229Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
230
231const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
232  "type": "object",
233  "properties": {
234    "domain": { "type": "string" },
235    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
236    "reasoning": { "type": "string" }
237  },
238  "required": ["domain", "confidence", "reasoning"],
239  "additionalProperties": false
240}"#;
241
242// G27 P2: graph-audit — audit graph for quality issues
243const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
244Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
245Respond with a list of issues found (or empty if none) and an overall quality score.";
246
247const GRAPH_AUDIT_SCHEMA: &str = r#"{
248  "type": "object",
249  "properties": {
250    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
251    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
252    "reasoning": { "type": "string" }
253  },
254  "required": ["quality_score", "issues", "reasoning"],
255  "additionalProperties": false
256}"#;
257
258// G27 P2: deep-research-synth — synthesize research findings into graph
259const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
260Entity names: lowercase kebab-case, domain-specific.\n\
261Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
262Respond with extracted entities, relationships, and a synthesis summary.";
263
264const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
265  "type": "object",
266  "properties": {
267    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
268    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
269    "summary": { "type": "string" }
270  },
271  "required": ["entities", "relationships", "summary"],
272  "additionalProperties": false
273}"#;
274
275// G27 P2: body-extract — extract structured content from unstructured text
276const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
277Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
278Respond with the restructured body and a brief summary of changes.";
279
280const BODY_EXTRACT_SCHEMA: &str = r#"{
281  "type": "object",
282  "properties": {
283    "restructured_body": { "type": "string" },
284    "changes_summary": { "type": "string" }
285  },
286  "required": ["restructured_body", "changes_summary"],
287  "additionalProperties": false
288}"#;
289
290// ---------------------------------------------------------------------------
291// Prompts
292// ---------------------------------------------------------------------------
293
294const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2951. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2962. Typed relationships between entities with strength scores\n\n\
297Rules:\n\
298- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
299- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
300- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
301- NEVER use 'mentions' as relationship type\n\
302- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
303- Prefer fewer high-quality entities over many low-quality ones";
304
305const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
306
307const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
308
309// ---------------------------------------------------------------------------
310// CLI args
311// ---------------------------------------------------------------------------
312
313/// Operation to perform in the `enrich` command.
314#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
315#[serde(rename_all = "kebab-case")]
316pub enum EnrichOperation {
317    /// Add missing entity/relationship bindings to memories (fully implemented).
318    /// memory-bindings LINKS each memory to the EXISTING entities extracted from
319    /// its body — it does not invent a new graph, it only connects what is missing. Scans
320    /// only UNBOUND memories (those with zero `memory_entities`).
321    MemoryBindings,
322    /// GAP-SG-24/26: additive augmentation — re-run binding extraction over
323    /// memories that are ALREADY bound, filtered by `--names`/`--names-file`, to
324    /// merge newly-discovered entities/relationships WITHOUT removing existing
325    /// links. Requires a name filter (refuses to re-scan the whole namespace).
326    AugmentBindings,
327    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
328    EntityDescriptions,
329    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
330    BodyEnrich,
331    /// Rebuild missing memory embeddings without rewriting the memory body.
332    ReEmbed,
333    /// Calibrate relationship weights using LLM analysis (scan only).
334    WeightCalibrate,
335    /// Reclassify relationship types using LLM judgment (scan only).
336    RelationReclassify,
337    /// Connect isolated entities by suggesting new relationships (scan only).
338    EntityConnect,
339    /// Validate entity type assignments using LLM judgment (scan only).
340    EntityTypeValidate,
341    /// Enrich memory descriptions that are generic/auto-generated (scan only).
342    DescriptionEnrich,
343    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
344    CrossDomainBridges,
345    /// Classify memories into domain categories (scan only).
346    DomainClassify,
347    /// Audit the graph for quality issues (scan only).
348    GraphAudit,
349    /// Synthesize deep-research findings into graph memories (scan only).
350    DeepResearchSynth,
351    /// Extract structured body from unstructured text (scan only).
352    BodyExtract,
353}
354
355/// LLM provider for enrichment.
356#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
357pub enum EnrichMode {
358    /// Use locally installed Claude Code CLI (OAuth-first).
359    ClaudeCode,
360    /// Use locally installed OpenAI Codex CLI.
361    Codex,
362    /// Use locally installed OpenCode CLI.
363    #[value(name = "opencode")]
364    Opencode,
365    /// Use the OpenRouter chat-completions REST API (no local CLI; v1.0.95).
366    #[value(name = "openrouter")]
367    OpenRouter,
368}
369
370impl std::fmt::Display for EnrichMode {
371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372        match self {
373            EnrichMode::ClaudeCode => write!(f, "claude-code"),
374            EnrichMode::Codex => write!(f, "codex"),
375            EnrichMode::Opencode => write!(f, "opencode"),
376            EnrichMode::OpenRouter => write!(f, "openrouter"),
377        }
378    }
379}
380
381/// Arguments for the `enrich` subcommand.
382#[derive(clap::Args)]
383#[command(
384    about = "Enrich graph memories and entities using an LLM provider",
385    after_long_help = "EXAMPLES:\n  \
386    # Add missing entity bindings to all unbound memories\n  \
387    sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n  \
388    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
389    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
390    # Expand short memory bodies (GAP-18)\n  \
391    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
392    # Rebuild only missing memory embeddings without rewriting bodies\n  \
393    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
394    # Resume an interrupted body-enrich run\n  \
395    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
396    # Retry only failed items from a previous run\n  \
397    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n  \
398    # Converge the whole backlog (internal scan+drain loop, no bash wrapper)\n  \
399    sqlite-graphrag enrich --operation memory-bindings --mode openrouter \\\n    \
400      --openrouter-model deepseek/deepseek-v4-flash:nitro --until-empty --max-runtime 600\n\n  \
401    # Inspect / resurrect dead-letter items\n  \
402    sqlite-graphrag enrich --operation memory-bindings --list-dead\n  \
403    sqlite-graphrag enrich --operation memory-bindings --requeue-dead\n\n  \
404    # Read-only status (no LLM, no singleton)\n  \
405    sqlite-graphrag enrich --operation memory-bindings --status\n\n\
406    OPERATIONS NOTE:\n  \
407    memory-bindings LINKS each memory to the EXISTING entities extracted from its\n  \
408    body — it does not invent a new graph, it connects what is missing. It scans\n  \
409    only UNBOUND memories. To re-run extraction over ALREADY-bound memories and\n  \
410    MERGE newly-found entities/relationships additively (without removing links),\n  \
411    use --operation augment-bindings with --names/--names-file.\n\n\
412    DEAD-LETTER SIDECAR (.enrich-queue.sqlite):\n  \
413    A SQLite sidecar tracks each work item across runs. Schema (table `queue`):\n  \
414    item_key (UNIQUE name/id), item_type (memory|entity), operation, memory_id,\n  \
415    status (pending|processing|done|skipped|dead), attempt, error, error_class,\n  \
416    next_retry_at (backoff cooldown). --until-empty loops scan→drain internally\n  \
417    until eligible items are exhausted; transient failures (incl. malformed/non-\n  \
418    JSON LLM output, GAP-SG-09) reschedule with backoff until --max-attempts, then\n  \
419    land in status='dead'. Use --status to see the queue, --list-dead to inspect\n  \
420    the sink, --requeue-dead to retry it, and --ignore-backoff to skip cooldowns.\n  \
421    --names/--names-file also remedy a cooldown by targeting a specific subset.\n\n\
422    EXIT CODES:\n  \
423    0  success\n  \
424    1  validation error (bad args, binary not found)\n  \
425    14 I/O error"
426)]
427pub struct EnrichArgs {
428    /// Enrichment operation to run. Required for write operations; optional for
429    /// the read-only queue inspectors (`--status` / `--list-dead` /
430    /// `--requeue-dead`), where it defaults to `memory-bindings` when omitted
431    /// (GAP-SG-31).
432    #[arg(
433        long,
434        short = 'o',
435        value_enum,
436        value_name = "OPERATION",
437        required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
438    )]
439    pub operation: Option<EnrichOperation>,
440
441    /// LLM provider to use. Required for write operations; not needed for the
442    /// read-only queue inspectors (`--status` / `--list-dead` /
443    /// `--requeue-dead`), which never call the LLM (GAP-SG-31).
444    #[arg(
445        long,
446        value_enum,
447        required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
448    )]
449    pub mode: Option<EnrichMode>,
450
451    /// Maximum number of items to process in this run. Omit for all.
452    #[arg(long, value_name = "N")]
453    pub limit: Option<usize>,
454
455    /// Preview items without calling the LLM (zero tokens consumed).
456    #[arg(long)]
457    pub dry_run: bool,
458
459    /// Namespace to operate on. Default: global.
460    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
461    pub namespace: Option<String>,
462
463    // -- Provider flags (Claude) --
464    /// Path to the Claude Code binary. Default: auto-detect from PATH.
465    #[arg(long, value_name = "PATH")]
466    pub claude_binary: Option<PathBuf>,
467
468    /// Claude model to use (e.g. claude-sonnet-4-6).
469    #[arg(long, value_name = "MODEL")]
470    pub claude_model: Option<String>,
471
472    /// Timeout per item in seconds when using Claude Code. Default: 300.
473    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
474    pub claude_timeout: u64,
475
476    // -- Provider flags (Codex) --
477    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
478    #[arg(long, value_name = "PATH")]
479    pub codex_binary: Option<PathBuf>,
480
481    /// Codex model to use (e.g. o4-mini).
482    #[arg(long, value_name = "MODEL")]
483    pub codex_model: Option<String>,
484
485    /// Timeout per item in seconds when using Codex. Default: 300.
486    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
487    pub codex_timeout: u64,
488
489    // -- Provider flags (OpenCode) --
490    /// Path to the OpenCode binary. Default: auto-detect from PATH.
491    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
492    pub opencode_binary: Option<PathBuf>,
493
494    /// OpenCode model to use.
495    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
496    pub opencode_model: Option<String>,
497
498    /// Timeout per item in seconds when using OpenCode. Default: 300.
499    #[arg(
500        long,
501        value_name = "SECONDS",
502        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
503        default_value_t = 300
504    )]
505    pub opencode_timeout: u64,
506
507    // -- Provider flags (OpenRouter, v1.0.95) --
508    /// OpenRouter text model to use (REQUIRED with --mode openrouter; no default).
509    #[arg(long, value_name = "MODEL")]
510    pub openrouter_model: Option<String>,
511
512    /// OpenRouter API key. Falls back to OPENROUTER_API_KEY env or stored config.
513    #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
514    pub openrouter_api_key: Option<String>,
515
516    /// Timeout per item in seconds when using OpenRouter. Default: 600.
517    ///
518    /// GAP-SG-17: raised from 300 to 600 because dense bodies (close to the
519    /// ~32K-token context ceiling of the configured model) routinely take
520    /// longer than five minutes to generate via `deepseek-v4-flash:nitro`.
521    /// Raise it further for very large corpora; lower it for short snippets.
522    #[arg(long, value_name = "SECONDS", default_value_t = 600)]
523    pub openrouter_timeout: u64,
524
525    /// Optional OpenRouter base URL override (reserved; defaults to the public API).
526    #[arg(long, value_name = "URL")]
527    pub openrouter_base_url: Option<String>,
528
529    // -- Cost controls --
530    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
531    #[arg(long, value_name = "USD")]
532    pub max_cost_usd: Option<f64>,
533
534    // -- Queue controls --
535    /// Resume a previously interrupted run (skip already-done items).
536    #[arg(long)]
537    pub resume: bool,
538
539    /// Retry only items that failed in a previous run.
540    #[arg(long)]
541    pub retry_failed: bool,
542
543    /// GAP-ENRICH-BACKLOG-CONVERGE: loop scan→drain internally until the queue
544    /// empties of eligible items or --max-runtime elapses; removes the need for
545    /// an external bash retry loop.
546    #[arg(long)]
547    pub until_empty: bool,
548
549    /// GAP-ENRICH-BACKLOG-CONVERGE: wall-clock ceiling in seconds for
550    /// --until-empty. Defaults to 3600 when omitted.
551    #[arg(long, value_name = "SECONDS")]
552    pub max_runtime: Option<u64>,
553
554    /// GAP-ENRICH-BACKLOG-CONVERGE: attempts per item before it becomes a
555    /// dead-letter (status='dead'). Range 1..=20. Default 8.
556    ///
557    /// GAP-SG-21: the default was raised from 5 to 8 because GAP-SG-09 now
558    /// reclassifies malformed / non-JSON LLM output as TRANSIENT (retryable)
559    /// rather than a permanent HardFailure. A flaky structured-output model
560    /// (e.g. deepseek-v4-flash:nitro) may emit several bad generations in a row
561    /// even after JSON repair (GAP-SG-10) recovers most of them; the extra
562    /// attempts give the backlog room to converge before an item is parked in
563    /// the dead-letter sink. Permanent faults (ProviderError, NotFound) still
564    /// dead-letter on the first attempt regardless of this value.
565    #[arg(long, value_name = "N", default_value_t = 8, value_parser = clap::value_parser!(u32).range(1..=20))]
566    pub max_attempts: u32,
567
568    /// GAP-ENRICH-BACKLOG-CONVERGE: read-only mode — report queue and backlog
569    /// counts without calling the LLM or acquiring the singleton.
570    #[arg(long)]
571    pub status: bool,
572
573    /// GAP-SG-23: list every dead-letter item (status='dead') for the current
574    /// operation with its error_class, attempt count and last error message.
575    /// Read-only — no LLM, no singleton. Use it to inspect what `--requeue-dead`
576    /// would resurrect before running it.
577    #[arg(long)]
578    pub list_dead: bool,
579
580    /// GAP-SG-11/14: resurrect dead-letter items — move every `status='dead'`
581    /// row back to `pending`, zeroing `attempt`, `next_retry_at`, `error` and
582    /// `error_class`. Distinct from `--retry-failed`, which only resets the
583    /// legacy `status='failed'` rows; dead-letter rows are the terminal sink of
584    /// the v1.0.96 converge loop and are never re-selected without this flag.
585    /// No LLM call or singleton is taken — it only rewrites queue statuses.
586    #[arg(long)]
587    pub requeue_dead: bool,
588
589    /// GAP-SG-66: prune ORPHAN dead-letter rows — remove every `status='dead'`
590    /// memory row whose `item_key` (the memory name) no longer exists in the
591    /// main DB for this namespace. These are terminal "not found" failures that
592    /// `--requeue-dead` can never recover (re-processing re-fails the same way),
593    /// so they inflate `queue_dead` forever. Read-only on the main DB; deletes
594    /// only confirmed-orphan rows from the queue sidecar. Entity-keyed dead rows
595    /// are left untouched. No LLM, no singleton — like `--list-dead`.
596    #[arg(long)]
597    pub prune_dead_orphans: bool,
598
599    /// GAP-SG-16: ignore the per-item backoff cooldown (`next_retry_at`) when
600    /// selecting candidates, so items waiting on exponential backoff are
601    /// processed immediately. Use to drain a backlog whose cooldown windows are
602    /// long but the provider has recovered. Without it, `--status` reports such
603    /// items under `waiting` and they are skipped until their `next_retry_at`.
604    #[arg(long)]
605    pub ignore_backoff: bool,
606
607    /// GAP-SG-28: read-only `body-extract` — extract entities/relationships into
608    /// the graph WITHOUT rewriting (or truncating) the memory body. The default
609    /// `body-extract` restructures the stored body in place; with this flag the
610    /// body is left untouched and only graph bindings are persisted (additive,
611    /// via the same upsert path as `memory-bindings`). Ignored for every other
612    /// operation.
613    #[arg(long)]
614    pub body_extract_graph_only: bool,
615
616    /// GAP-ENRICH-BACKLOG-CONVERGE: REST concurrency for --mode openrouter
617    /// (clamp 1..=16, default 8). Distinct from the legacy --llm-parallelism.
618    #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
619    pub rest_concurrency: Option<u32>,
620
621    // -- body-enrich specific flags (GAP-18) --
622    /// Minimum output character count for body-enrich. Default: 500.
623    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
624    pub min_output_chars: usize,
625
626    /// Maximum output character count for body-enrich. Default: 2000.
627    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
628    pub max_output_chars: usize,
629
630    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
631    #[arg(long, default_value_t = true)]
632    pub preserve_check: bool,
633
634    /// Path to a custom prompt template file for body-enrich.
635    #[arg(long, value_name = "PATH")]
636    pub prompt_template: Option<PathBuf>,
637
638    /// Number of parallel LLM workers (default 1 = serial).
639    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
640    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
641    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
642    pub llm_parallelism: u32,
643
644    // -- Output / infra --
645    /// Emit NDJSON output. Always true; flag accepted for compatibility.
646    #[arg(long)]
647    pub json: bool,
648
649    /// Database path override.
650    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
651    pub db: Option<String>,
652
653    /// G30: poll for the job singleton every second for up to N seconds
654    /// when another invocation holds the lock. Default: 0 (fail fast).
655    #[arg(long, value_name = "SECONDS")]
656    pub wait_job_singleton: Option<u64>,
657
658    /// G30: force acquisition of the singleton lock by removing a stale
659    /// lock file from a previously crashed invocation. Use only when you
660    /// are certain no other `enrich`/`ingest` is running.
661    #[arg(long, default_value_t = false)]
662    pub force_job_singleton: bool,
663
664    /// G37: select a specific subset of memory names to enrich instead of
665    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
666    /// Empty when omitted (processes all candidates).
667    ///
668    /// GAP-SG-18: also a cooldown remedy — when `--status` shows items under
669    /// `waiting` (parked on `next_retry_at` backoff), pass the exact names here
670    /// to re-enqueue and process just that subset on the next run instead of
671    /// waiting for every cooldown to elapse. REQUIRED for `--operation
672    /// augment-bindings`, which refuses to re-scan the whole namespace.
673    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
674    pub names: Vec<String>,
675
676    /// G37: read the subset of memory names from a file (one per line).
677    /// Lines starting with `#` and empty lines are ignored. Combined with
678    /// `--names` (union) when both are set.
679    #[arg(long, value_name = "PATH")]
680    pub names_file: Option<PathBuf>,
681
682    /// G35: probe the LLM provider with a 1-turn ping before processing
683    /// the batch. Aborts with a clear error if the rate-limit window is
684    /// closed (avoids burning N turns only to fail on item 1).
685    #[arg(long, default_value_t = false)]
686    pub preflight_check: bool,
687
688    /// G35: if a preflight probe or in-flight call hits the Claude rate
689    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
690    /// of failing the batch. Ignored when `--mode` is already `codex`.
691    #[arg(long, value_enum)]
692    pub fallback_mode: Option<EnrichMode>,
693
694    /// G35: number of seconds before the OAuth rate-limit reset at which
695    /// the preflight probe should refuse to start. Default 300 (5 min).
696    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
697    pub rate_limit_buffer: u64,
698
699    /// G28-D: refuse to start when the 1-minute load average exceeds
700    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
701    /// Set to false to skip the check on contended CI runners.
702    #[arg(long, default_value_t = true)]
703    pub max_load_check: bool,
704
705    /// G28-D: when the system is saturated, abort the job after this
706    /// many consecutive HardFailure outcomes. Default 5.
707    #[arg(long, value_name = "N", default_value_t = 5)]
708    pub circuit_breaker_threshold: u32,
709
710    /// G29 Step 4: minimum trigram-Jaccard similarity between the
711    /// original body and the LLM-rewritten body for the rewrite to be
712    /// accepted. Scores below the threshold are rejected and emitted as
713    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
714    /// gap specification). Ignored when `--operation` is not
715    /// `body-enrich`.
716    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
717    pub preserve_threshold: f64,
718
719    /// G33 Step 3: when set, validate `--codex-model` against the
720    /// ChatGPT Pro OAuth accepted-model list and abort with a
721    /// suggestion when the value is unknown. Default true (fail fast
722    /// to avoid burning OAuth turns). Set to false to opt out.
723    #[arg(long, default_value_t = true)]
724    pub codex_model_validate: bool,
725
726    /// G33 Step 3: when set together with an invalid `--codex-model`,
727    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
728    /// instead of aborting. The substitution is recorded in the NDJSON
729    /// stream as `provider_substituted: true` for traceability.
730    #[arg(long, value_name = "MODEL")]
731    pub codex_model_fallback: Option<String>,
732}
733
734impl EnrichArgs {
735    /// GAP-SG-31: resolved enrichment operation.
736    ///
737    /// `operation` is `Option` so the read-only queue inspectors
738    /// (`--status` / `--list-dead` / `--requeue-dead`) can run without it.
739    /// Write paths always carry a value (enforced by
740    /// `required_unless_present_any` at parse time); the read-only paths fall
741    /// back to `memory-bindings`, the most common queue, when it is omitted.
742    fn operation(&self) -> EnrichOperation {
743        self.operation
744            .clone()
745            .unwrap_or(EnrichOperation::MemoryBindings)
746    }
747
748    /// GAP-SG-31: resolved LLM provider. `mode` is `Option` for the read-only
749    /// inspectors that never call the LLM; write paths always carry a value
750    /// (enforced by `required_unless_present_any`). The fallback is only ever
751    /// observed by read-only code that does not actually invoke the provider.
752    fn mode(&self) -> EnrichMode {
753        self.mode.clone().unwrap_or(EnrichMode::OpenRouter)
754    }
755}
756
757// ---------------------------------------------------------------------------
758// Internal types — raw LLM output structs
759// ---------------------------------------------------------------------------
760
761// ---------------------------------------------------------------------------
762// NDJSON event types emitted to stdout
763// ---------------------------------------------------------------------------
764
765#[derive(Debug, Serialize)]
766struct PhaseEvent<'a> {
767    phase: &'a str,
768    #[serde(skip_serializing_if = "Option::is_none")]
769    binary_path: Option<&'a str>,
770    #[serde(skip_serializing_if = "Option::is_none")]
771    version: Option<&'a str>,
772    #[serde(skip_serializing_if = "Option::is_none")]
773    items_total: Option<usize>,
774    #[serde(skip_serializing_if = "Option::is_none")]
775    items_pending: Option<usize>,
776    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
777    #[serde(skip_serializing_if = "Option::is_none")]
778    llm_parallelism: Option<u32>,
779}
780
781/// GAP-SG-45: separates the SCAN metric (always serial — a single SQL sweep of
782/// the candidate set) from the DRAIN metric (the parallel worker fan-out). The
783/// legacy "scan" `PhaseEvent` reported `llm_parallelism` on the scan event,
784/// conflating the two; this event makes the distinction explicit.
785#[derive(Debug, Serialize)]
786struct ConcurrencyEvent {
787    phase: &'static str,
788    scan_parallelism: u32,
789    drain_parallelism: u32,
790}
791
792#[derive(Debug, Serialize)]
793struct ItemEvent<'a> {
794    /// Item identifier (memory name or entity name).
795    item: &'a str,
796    status: &'a str,
797    #[serde(skip_serializing_if = "Option::is_none")]
798    memory_id: Option<i64>,
799    #[serde(skip_serializing_if = "Option::is_none")]
800    entity_id: Option<i64>,
801    #[serde(skip_serializing_if = "Option::is_none")]
802    entities: Option<usize>,
803    #[serde(skip_serializing_if = "Option::is_none")]
804    rels: Option<usize>,
805    #[serde(skip_serializing_if = "Option::is_none")]
806    chars_before: Option<usize>,
807    #[serde(skip_serializing_if = "Option::is_none")]
808    chars_after: Option<usize>,
809    #[serde(skip_serializing_if = "Option::is_none")]
810    cost_usd: Option<f64>,
811    #[serde(skip_serializing_if = "Option::is_none")]
812    elapsed_ms: Option<u64>,
813    #[serde(skip_serializing_if = "Option::is_none")]
814    error: Option<String>,
815    index: usize,
816    total: usize,
817}
818
819#[derive(Debug, Serialize)]
820struct EnrichSummary {
821    summary: bool,
822    operation: String,
823    items_total: usize,
824    completed: usize,
825    failed: usize,
826    skipped: usize,
827    cost_usd: f64,
828    elapsed_ms: u64,
829    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that actually
830    /// ran the re-embedding during enrich. `"claude" | "codex" | "none"`.
831    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
832    /// or when the operation did not involve a re-embed).
833    #[serde(skip_serializing_if = "Option::is_none")]
834    backend_invoked: Option<&'static str>,
835    /// GAP-SG-15: items still parked on backoff (`status='pending'` with a future
836    /// `next_retry_at`) when the run ended. Non-zero means the backlog has NOT
837    /// converged — those items are waiting on a cooldown, not done.
838    waiting: i64,
839    /// GAP-SG-15: dead-letter items (`status='dead'`) at the end of the run.
840    /// Non-zero requires `--list-dead` to inspect and `--requeue-dead` to retry.
841    dead: i64,
842}
843
844use crate::output::emit_json_line as emit_json;
845
846// ---------------------------------------------------------------------------
847// Queue DB
848// ---------------------------------------------------------------------------
849
850// Queue functions and structs moved to queue.rs
851
852// LLM call_claude and call_openrouter moved to extraction.rs
853
854// ---------------------------------------------------------------------------
855// Preflight probe (G35) — single-turn ping to verify the LLM provider
856// ---------------------------------------------------------------------------
857
858/// Result of a single preflight ping (G35).
859enum PreflightOutcome {
860    /// The provider accepted the ping without rate-limit or other errors.
861    Healthy,
862    /// The provider rejected the ping due to OAuth rate limit. The
863    /// `suggestion` field is a human hint that callers can embed in the
864    /// user-facing error.
865    RateLimited {
866        reason: String,
867        suggestion: &'static str,
868    },
869    /// Any other provider error (binary missing, auth failure, etc.).
870    Error(AppError),
871}
872
873/// Probes the configured LLM provider with a 1-turn ping.
874///
875/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
876/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
877///
878/// The probe intentionally avoids spawning any MCP server children (G28-A)
879/// to keep its own process footprint at the minimum.
880fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
881    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
882
883    match args.mode() {
884        EnrichMode::ClaudeCode => {
885            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
886                Ok(b) => b,
887                Err(e) => return PreflightOutcome::Error(e),
888            };
889            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
890            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
891            // form) and run the preflight gate before spawn, mirroring
892            // the canonical pattern in `claude_runner::build_claude_command`.
893            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
894                Ok(p) => p,
895                Err(e) => {
896                    return PreflightOutcome::Error(AppError::Io(e));
897                }
898            };
899            let mut cmd = std::process::Command::new(&bin);
900            crate::spawn::env_whitelist::apply_env_whitelist(
901                &mut cmd,
902                crate::spawn::env_whitelist::is_strict_env_clear(),
903            );
904            if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
905                return PreflightOutcome::Error(e);
906            }
907            cmd.arg("-p")
908                .arg("ping")
909                .arg("--max-turns")
910                .arg("1")
911                .arg("--strict-mcp-config")
912                .arg("--mcp-config")
913                .arg(mcp_config_path.as_os_str())
914                .arg("--dangerously-skip-permissions")
915                .arg("--settings")
916                .arg("{\"hooks\":{}}")
917                .arg("--output-format")
918                .arg("json")
919                .stdin(std::process::Stdio::null())
920                .stdout(std::process::Stdio::piped())
921                .stderr(std::process::Stdio::piped());
922
923            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
924                Ok(c) => c,
925                Err(e) => {
926                    return PreflightOutcome::Error(AppError::Io(e));
927                }
928            };
929            let output = match wait_with_timeout(child, timeout) {
930                Ok(out) => out,
931                Err(e) => return PreflightOutcome::Error(e),
932            };
933            if !output.status.success() {
934                let stderr = String::from_utf8_lossy(&output.stderr);
935                if stderr.contains("hit your session limit")
936                    || stderr.contains("rate_limit")
937                    || stderr.contains("429")
938                {
939                    return PreflightOutcome::RateLimited {
940                        reason: stderr.trim().to_string(),
941                        suggestion:
942                            "wait for the OAuth window to reset or use --fallback-mode codex",
943                    };
944                }
945                return PreflightOutcome::Error(AppError::Validation(format!(
946                    "preflight probe failed: {stderr}",
947                    stderr = stderr.trim()
948                )));
949            }
950            PreflightOutcome::Healthy
951        }
952        EnrichMode::Codex => {
953            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
954                Ok(b) => b,
955                Err(e) => return PreflightOutcome::Error(e),
956            };
957            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
958                .map_err(PreflightOutcome::Error)
959                .ok();
960            let schema = "{}";
961            let schema_path = match super::codex_spawn::trusted_schema_path() {
962                Ok(p) => p,
963                Err(e) => return PreflightOutcome::Error(e),
964            };
965            let spawn_args = super::codex_spawn::CodexSpawnArgs {
966                binary: &bin,
967                prompt: "ping",
968                json_schema: schema,
969                input_text: "",
970                model: args.codex_model.as_deref(),
971                timeout_secs: args.rate_limit_buffer.max(60),
972                schema_path: schema_path.clone(),
973            };
974            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
975                Ok(c) => c,
976                Err(e) => return PreflightOutcome::Error(e),
977            };
978            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
979                Ok(c) => c,
980                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
981            };
982            let output = match wait_with_timeout(child, timeout) {
983                Ok(out) => out,
984                Err(e) => return PreflightOutcome::Error(e),
985            };
986            let _ = std::fs::remove_file(&schema_path);
987            if !output.status.success() {
988                let stderr = String::from_utf8_lossy(&output.stderr);
989                if stderr.contains("rate_limit")
990                    || stderr.contains("429")
991                    || stderr.contains("Too Many Requests")
992                {
993                    return PreflightOutcome::RateLimited {
994                        reason: stderr.trim().to_string(),
995                        suggestion: "wait for the rate-limit window to reset",
996                    };
997                }
998                return PreflightOutcome::Error(AppError::Validation(format!(
999                    "preflight probe failed: {stderr}",
1000                    stderr = stderr.trim()
1001                )));
1002            }
1003            PreflightOutcome::Healthy
1004        }
1005        EnrichMode::Opencode => {
1006            let bin = match super::opencode_runner::find_opencode_binary_with_override(
1007                args.opencode_binary.as_deref(),
1008            ) {
1009                Ok(b) => b,
1010                Err(e) => return PreflightOutcome::Error(e),
1011            };
1012            let model =
1013                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1014            let mut cmd =
1015                match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1016                {
1017                    Ok(c) => c,
1018                    Err(e) => return PreflightOutcome::Error(e),
1019                };
1020            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1021                Ok(c) => c,
1022                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1023            };
1024            let output = match wait_with_timeout(child, timeout) {
1025                Ok(out) => out,
1026                Err(e) => return PreflightOutcome::Error(e),
1027            };
1028            if !output.status.success() {
1029                let stderr = String::from_utf8_lossy(&output.stderr);
1030                if stderr.contains("rate_limit")
1031                    || stderr.contains("429")
1032                    || stderr.contains("Too Many Requests")
1033                {
1034                    return PreflightOutcome::RateLimited {
1035                        reason: stderr.trim().to_string(),
1036                        suggestion: "wait for the rate-limit window to reset",
1037                    };
1038                }
1039                return PreflightOutcome::Error(AppError::Validation(format!(
1040                    "preflight probe failed: {stderr}",
1041                    stderr = stderr.trim()
1042                )));
1043            }
1044            PreflightOutcome::Healthy
1045        }
1046        EnrichMode::OpenRouter => {
1047            // v1.0.95: the OpenRouter JUDGE has no subprocess to ping; the
1048            // preflight only confirms a usable API key resolves. The chat
1049            // client singleton is initialised in run() before scan.
1050            match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1051                Some(_) => PreflightOutcome::Healthy,
1052                None => PreflightOutcome::Error(AppError::Validation(
1053                    "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1054                )),
1055            }
1056        }
1057    }
1058}
1059
1060/// Cross-platform wait with timeout (no extra crate dependency).
1061fn wait_with_timeout(
1062    mut child: std::process::Child,
1063    timeout: std::time::Duration,
1064) -> Result<std::process::Output, AppError> {
1065    use wait_timeout::ChildExt;
1066    let start = std::time::Instant::now();
1067    let Some(exit) = child.wait_timeout(timeout).map_err(AppError::Io)? else {
1068        let _ = child.kill();
1069        let _ = child.wait();
1070        return Err(AppError::Validation(format!(
1071            "preflight probe timed out after {}s",
1072            start.elapsed().as_secs()
1073        )));
1074    };
1075    let mut stdout = Vec::new();
1076    if let Some(mut out) = child.stdout.take() {
1077        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1078    }
1079    let mut stderr = Vec::new();
1080    if let Some(mut err) = child.stderr.take() {
1081        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1082    }
1083    Ok(std::process::Output {
1084        status: exit,
1085        stdout,
1086        stderr,
1087    })
1088}
1089
1090// Scan functions moved to scan.rs
1091
1092// Persist functions moved to postprocess.rs
1093
1094// ---------------------------------------------------------------------------
1095// Main entry point
1096// ---------------------------------------------------------------------------
1097
1098// ---------------------------------------------------------------------------
1099// G20: mode-conditional flag validation
1100// ---------------------------------------------------------------------------
1101
1102/// True when a scalar value matches its declared default. Used to
1103/// distinguish "operator passed an explicit override" from "clap filled
1104/// the default" for flags with default_value_t.
1105fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1106    value == default
1107}
1108
1109/// G20: validate that flags for one LLM provider were not passed when
1110/// the operator selected a different provider. Flags silently discarded
1111/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1112/// DB work, so the operator gets an actionable error instead of a
1113/// surprise at runtime.
1114///
1115/// Detection rules:
1116/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1117/// - For scalar fields with default_value_t: value != default means explicit
1118/// - For boolean fields: true means explicit (default is false)
1119///
1120/// Mode-specific matrices:
1121/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1122/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1123fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1124    const DEFAULT_TIMEOUT: u64 = 300;
1125
1126    let mut conflicts: Vec<String> = Vec::new();
1127
1128    match args.mode() {
1129        EnrichMode::ClaudeCode => {
1130            if args.codex_binary.is_some() {
1131                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1132            }
1133            if args.codex_model.is_some() {
1134                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1135            }
1136            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1137                conflicts.push(format!(
1138                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1139                    args.codex_timeout
1140                ));
1141            }
1142        }
1143        EnrichMode::Codex => {
1144            if args.claude_binary.is_some() {
1145                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1146            }
1147            if args.claude_model.is_some() {
1148                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1149            }
1150            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1151                conflicts.push(format!(
1152                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1153                    args.claude_timeout
1154                ));
1155            }
1156            if args.max_cost_usd.is_some() {
1157                conflicts.push(
1158                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1159                        .to_string(),
1160                );
1161            }
1162        }
1163        EnrichMode::Opencode => {
1164            if args.claude_binary.is_some() {
1165                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1166            }
1167            if args.claude_model.is_some() {
1168                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1169            }
1170            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1171                conflicts.push(format!(
1172                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1173                    args.claude_timeout
1174                ));
1175            }
1176            if args.max_cost_usd.is_some() {
1177                conflicts.push(
1178                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1179                        .to_string(),
1180                );
1181            }
1182        }
1183        EnrichMode::OpenRouter => {
1184            if args.claude_binary.is_some() {
1185                conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1186            }
1187            if args.claude_model.is_some() {
1188                conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1189            }
1190            if args.codex_binary.is_some() {
1191                conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1192            }
1193            if args.codex_model.is_some() {
1194                conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1195            }
1196            if args.opencode_binary.is_some() {
1197                conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1198            }
1199            if args.opencode_model.is_some() {
1200                conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1201            }
1202            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1203                conflicts.push(format!(
1204                    "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1205                    args.claude_timeout
1206                ));
1207            }
1208            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1209                conflicts.push(format!(
1210                    "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1211                    args.codex_timeout
1212                ));
1213            }
1214            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1215                conflicts.push(format!(
1216                    "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1217                    args.opencode_timeout
1218                ));
1219            }
1220        }
1221    }
1222
1223    if !conflicts.is_empty() {
1224        return Err(AppError::Validation(format!(
1225            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1226            args.mode(),
1227            conflicts.join("\n  - ")
1228        )));
1229    }
1230
1231    Ok(())
1232}
1233
1234// ---------------------------------------------------------------------------
1235
1236/// Main entry point for the `enrich` command.
1237pub fn run(
1238    args: &EnrichArgs,
1239    llm_backend: crate::cli::LlmBackendChoice,
1240    embedding_backend: crate::cli::EmbeddingBackendChoice,
1241) -> Result<(), AppError> {
1242    // G20: mode-conditional flag validation BEFORE any DB access.
1243    // Surfaces flags that the wrong mode would silently discard.
1244    validate_mode_conditional_flags_enrich(args)?;
1245
1246    // GAP-ENRICH-BACKLOG-CONVERGE: --status is a read-only report. It never
1247    // calls the LLM, never initialises the OpenRouter client, and never
1248    // acquires the job singleton, so it is safe to run while a real enrich is
1249    // in flight (it only reads the queue DB and the unbound backlog).
1250    // GAP-SG-23/11: --list-dead (inspect dead-letter rows) and --requeue-dead
1251    // (resurrect them) are queue-only operations — no LLM, no main-DB write, no
1252    // singleton. Both are scoped to the current --operation so a shared queue is
1253    // not cross-contaminated. Handled before any provider setup.
1254    if args.list_dead || args.requeue_dead || args.prune_dead_orphans {
1255        let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1256        let op_label = format!("{:?}", args.operation());
1257        let paths = AppPaths::resolve(args.db.as_deref())?;
1258        let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1259        let queue_conn = open_queue_db(&queue_path)?;
1260        // GAP-SG-66: prune orphan dead rows (memory gone) — needs the main DB to
1261        // confirm the referenced memory is truly absent before deleting.
1262        if args.prune_dead_orphans {
1263            ensure_db_ready(&paths)?;
1264            let main_conn = open_rw(&paths.db)?;
1265            let pruned = prune_dead_orphans(&queue_conn, &main_conn, &op_label, &namespace)?;
1266            let dead_total: i64 = queue_conn
1267                .query_row(
1268                    "SELECT COUNT(*) FROM queue WHERE status='dead' \
1269                     AND (operation = ?1 OR operation IS NULL)",
1270                    rusqlite::params![op_label],
1271                    |r| r.get(0),
1272                )
1273                .unwrap_or(0);
1274            emit_json(&DeadSummary {
1275                summary: true,
1276                operation: op_label,
1277                namespace,
1278                action: "prune-dead-orphans",
1279                dead_total,
1280                requeued: 0,
1281                pruned,
1282            });
1283            return Ok(());
1284        }
1285        if args.list_dead {
1286            let mut stmt = queue_conn.prepare(
1287                "SELECT item_key, item_type, attempt, error_class, error FROM queue \
1288                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL) ORDER BY id",
1289            )?;
1290            let rows = stmt
1291                .query_map(rusqlite::params![op_label], |r| {
1292                    Ok(DeadItem {
1293                        dead_item: true,
1294                        item_key: r.get(0)?,
1295                        item_type: r.get(1)?,
1296                        attempt: r.get(2)?,
1297                        error_class: r.get(3)?,
1298                        error: r.get(4)?,
1299                    })
1300                })?
1301                .collect::<Result<Vec<_>, _>>()?;
1302            let dead_total = rows.len() as i64;
1303            for item in &rows {
1304                emit_json(item);
1305            }
1306            emit_json(&DeadSummary {
1307                summary: true,
1308                operation: op_label,
1309                namespace,
1310                action: "list-dead",
1311                dead_total,
1312                requeued: 0,
1313                pruned: 0,
1314            });
1315            return Ok(());
1316        }
1317        // --requeue-dead: move dead -> pending, clearing the failure bookkeeping.
1318        let dead_total: i64 = queue_conn
1319            .query_row(
1320                "SELECT COUNT(*) FROM queue WHERE status='dead' \
1321                 AND (operation = ?1 OR operation IS NULL)",
1322                rusqlite::params![op_label],
1323                |r| r.get(0),
1324            )
1325            .unwrap_or(0);
1326        let requeued = queue_conn
1327            .execute(
1328                "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1329                 error=NULL, error_class=NULL \
1330                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1331                rusqlite::params![op_label],
1332            )
1333            .map_err(|e| AppError::Validation(format!("requeue-dead failed: {e}")))?
1334            as i64;
1335        let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1336        emit_json(&DeadSummary {
1337            summary: true,
1338            operation: op_label,
1339            namespace,
1340            action: "requeue-dead",
1341            dead_total,
1342            requeued,
1343            pruned: 0,
1344        });
1345        return Ok(());
1346    }
1347
1348    if args.status {
1349        let paths = AppPaths::resolve(args.db.as_deref())?;
1350        ensure_db_ready(&paths)?;
1351        let conn = open_rw(&paths.db)?;
1352        let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1353        let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1354        let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1355        let queue_conn = open_queue_db(&queue_path)?;
1356        let op_label = format!("{:?}", args.operation());
1357        // GAP-SG-42: scope every count to the current operation. Rows migrated
1358        // before the `operation` column (NULL) are still counted so a legacy
1359        // queue is never reported as spuriously empty.
1360        let count_status = |st: &str, op: &str| -> i64 {
1361            queue_conn
1362                .query_row(
1363                    "SELECT COUNT(*) FROM queue WHERE status=?1 \
1364                     AND (operation = ?2 OR operation IS NULL)",
1365                    rusqlite::params![st, op],
1366                    |r| r.get(0),
1367                )
1368                .unwrap_or(0)
1369        };
1370        let eligible_now: i64 = queue_conn
1371            .query_row(
1372                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1373                 AND (operation = ?1 OR operation IS NULL) \
1374                 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1375                rusqlite::params![op_label],
1376                |r| r.get(0),
1377            )
1378            .unwrap_or(0);
1379        let waiting: i64 = queue_conn
1380            .query_row(
1381                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1382                 AND (operation = ?1 OR operation IS NULL) \
1383                 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1384                rusqlite::params![op_label],
1385                |r| r.get(0),
1386            )
1387            .unwrap_or(0);
1388        // GAP-SG-16: enumerate the items currently in backoff with their ETA.
1389        let waiting_items = {
1390            let mut stmt = queue_conn.prepare(
1391                "SELECT item_key, attempt, next_retry_at, error_class FROM queue \
1392                 WHERE status='pending' AND (operation = ?1 OR operation IS NULL) \
1393                 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now') \
1394                 ORDER BY next_retry_at",
1395            )?;
1396            let items: Vec<WaitingItem> = stmt
1397                .query_map(rusqlite::params![op_label], |r| {
1398                    Ok(WaitingItem {
1399                        item_key: r.get(0)?,
1400                        attempt: r.get(1)?,
1401                        next_retry_at: r.get(2)?,
1402                        error_class: r.get(3)?,
1403                    })
1404                })?
1405                .collect::<Result<Vec<_>, _>>()?;
1406            items
1407        };
1408        let queue_pending = count_status("pending", &op_label);
1409        let queue_processing = count_status("processing", &op_label);
1410        let queue_done = count_status("done", &op_label);
1411        let queue_failed = count_status("failed", &op_label);
1412        let queue_skipped = count_status("skipped", &op_label);
1413        let queue_dead = count_status("dead", &op_label);
1414        // GAP-SG-15/46: distinguish empty from cooldown from not-yet-scanned.
1415        let state = if eligible_now > 0 {
1416            "draining"
1417        } else if waiting > 0 {
1418            "cooldown"
1419        } else if queue_pending == 0 && unbound_backlog > 0 {
1420            "pending-scan"
1421        } else {
1422            "empty"
1423        };
1424        emit_json(&EnrichStatus {
1425            status_report: true,
1426            operation: op_label,
1427            namespace,
1428            unbound_backlog,
1429            queue_pending,
1430            queue_processing,
1431            queue_done,
1432            queue_failed,
1433            queue_skipped,
1434            queue_dead,
1435            eligible_now,
1436            waiting,
1437            state,
1438            waiting_items,
1439        });
1440        return Ok(());
1441    }
1442
1443    // v1.0.95 (ADR-0054): when the JUDGE is OpenRouter the model is mandatory
1444    // (no default) and the API key must resolve BEFORE any network or DB work.
1445    // The chat client singleton is initialised here so every per-item dispatch
1446    // fetches it without re-threading the key.
1447    if args.mode() == EnrichMode::OpenRouter {
1448        let model = args.openrouter_model.as_deref().ok_or_else(|| {
1449            AppError::Validation(
1450                "--mode openrouter requires --openrouter-model (no default model is allowed)"
1451                    .into(),
1452            )
1453        })?;
1454        let resolved =
1455            crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1456                .ok_or_else(|| {
1457                    AppError::Validation(
1458                        "OPENROUTER_API_KEY not found; set the env var, store it via \
1459                         `config add-key --provider openrouter`, or pass --openrouter-api-key"
1460                            .into(),
1461                    )
1462                })?;
1463        crate::embedder::get_openrouter_chat_client(
1464            resolved.value,
1465            model,
1466            args.openrouter_timeout,
1467        )?;
1468    }
1469
1470    let started = Instant::now();
1471
1472    let paths = AppPaths::resolve(args.db.as_deref())?;
1473    ensure_db_ready(&paths)?;
1474    let conn = open_rw(&paths.db)?;
1475    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1476
1477    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1478    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1479    // on the same DB cannot co-exist, but concurrent enrich on different
1480    // databases works as expected. The force flag (--force) breaks a
1481    // stale lock from a previously crashed invocation.
1482    let wait_secs = args.wait_job_singleton;
1483    let force_flag = args.force_job_singleton;
1484    let _singleton = crate::lock::acquire_job_singleton(
1485        crate::lock::JobType::Enrich,
1486        &namespace,
1487        &paths.db,
1488        wait_secs,
1489        force_flag,
1490    )?;
1491
1492    // Validate provider binary upfront only for LLM-backed operations.
1493    let provider_binary = if matches!(args.operation(), EnrichOperation::ReEmbed) {
1494        None
1495    } else {
1496        Some(match args.mode() {
1497            EnrichMode::ClaudeCode => {
1498                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1499                let version = super::claude_runner::validate_claude_version(&bin)?;
1500                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1501                emit_json(&PhaseEvent {
1502                    phase: "validate",
1503                    binary_path: bin.to_str(),
1504                    version: Some(&version),
1505                    items_total: None,
1506                    items_pending: None,
1507                    llm_parallelism: None,
1508                });
1509                bin
1510            }
1511            EnrichMode::Codex => {
1512                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1513                emit_json(&PhaseEvent {
1514                    phase: "validate",
1515                    binary_path: bin.to_str(),
1516                    version: None,
1517                    items_total: None,
1518                    items_pending: None,
1519                    llm_parallelism: None,
1520                });
1521                bin
1522            }
1523            EnrichMode::Opencode => {
1524                let bin = super::opencode_runner::find_opencode_binary_with_override(
1525                    args.opencode_binary.as_deref(),
1526                )?;
1527                emit_json(&PhaseEvent {
1528                    phase: "validate",
1529                    binary_path: bin.to_str(),
1530                    version: None,
1531                    items_total: None,
1532                    items_pending: None,
1533                    llm_parallelism: None,
1534                });
1535                bin
1536            }
1537            EnrichMode::OpenRouter => {
1538                // v1.0.95: the OpenRouter JUDGE is a REST call, not a spawned
1539                // binary. The chat client singleton was initialised at the top
1540                // of run(); this placeholder path threads through the dispatch
1541                // but is never dereferenced by the OpenRouter arm.
1542                emit_json(&PhaseEvent {
1543                    phase: "validate",
1544                    binary_path: None,
1545                    version: None,
1546                    items_total: None,
1547                    items_pending: None,
1548                    llm_parallelism: None,
1549                });
1550                PathBuf::new()
1551            }
1552        })
1553    };
1554
1555    // G28-D: refuse to start when the system is saturated. This check
1556    // is BEFORE preflight so we never spend an OAuth turn on a host
1557    // that is already at the limit.
1558    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1559        let load = crate::system_load::load_average_one();
1560        let n = crate::system_load::ncpus();
1561        return Err(AppError::Validation(format!(
1562            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1563             pass --no-max-load-check to override (not recommended)"
1564        )));
1565    }
1566
1567    // G35: preflight probe — issue a single ping turn to verify the
1568    // provider is healthy before scanning N candidates. If the probe
1569    // fails with a rate-limit error, optionally fall back to a
1570    // different mode (typically codex) instead of failing the entire
1571    // batch. The probe itself consumes 1 OAuth turn, so it stays
1572    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1573    if args.preflight_check
1574        && !args.dry_run
1575        && !matches!(args.operation(), EnrichOperation::ReEmbed)
1576    {
1577        let preflight_result = run_preflight_probe(args);
1578        match preflight_result {
1579            PreflightOutcome::Healthy => {
1580                tracing::info!(target: "enrich", mode = ?args.mode(), "preflight probe healthy");
1581            }
1582            PreflightOutcome::RateLimited { reason, suggestion } => {
1583                if let Some(fallback) = args.fallback_mode.clone() {
1584                    if fallback != args.mode() {
1585                        // G35 (v1.0.69): the mid-batch mode switch is
1586                        // intentionally NOT applied because it would
1587                        // desynchronise the per-item rate-limit wait
1588                        // state (rate-limited items in the worker are
1589                        // timed against the original provider). Instead
1590                        // we abort cleanly so the operator can re-invoke
1591                        // with `--mode {fallback:?}`. This guarantees no
1592                        // OAuth window is wasted and no partial state
1593                        // is left in the queue.
1594                        return Err(AppError::Validation(format!(
1595                            "preflight detected rate limit on {mode:?}: {reason}; \
1596                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1597                            mode = args.mode()
1598                        )));
1599                    }
1600                    return Err(AppError::Validation(format!(
1601                        "preflight detected rate limit on {mode:?}: {reason}; \
1602                         --fallback-mode matches --mode, no recovery possible",
1603                        mode = args.mode()
1604                    )));
1605                }
1606                return Err(AppError::Validation(format!(
1607                    "preflight detected rate limit on {mode:?}: {reason}; \
1608                     {suggestion}; pass --fallback-mode codex to recover",
1609                    mode = args.mode()
1610                )));
1611            }
1612            PreflightOutcome::Error(e) => {
1613                return Err(e);
1614            }
1615        }
1616    }
1617
1618    // SCAN phase
1619    let mut scan_result = scan_operation(&conn, &namespace, args)?;
1620    // GAP-SG-69: body-enrich candidates are scanned purely by `LENGTH(body) <
1621    // min_output_chars`, so a short body whose rewrite the preservation guard
1622    // keeps rejecting is re-scanned every pass — items_total never reaches 0 and
1623    // `--until-empty` never converges (the detached worker reported a stuck
1624    // backlog for 30+ min). Exclude memories already vetoed `status='skipped'`
1625    // for this operation in the sidecar queue; `cleanup_queue_entry`
1626    // (remember/edit/forget/purge) clears the veto when the body actually
1627    // changes, so a genuinely updated memory is reconsidered automatically.
1628    if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1629        let q_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1630        if let Ok(q) = open_queue_db(&q_path) {
1631            if let Ok(vetoed) = skipped_item_keys(&q, &format!("{:?}", args.operation())) {
1632                scan_result.retain(|k| !vetoed.contains(k));
1633            }
1634        }
1635    }
1636    let total = scan_result.len();
1637
1638    emit_json(&PhaseEvent {
1639        phase: "scan",
1640        binary_path: None,
1641        version: None,
1642        items_total: Some(total),
1643        items_pending: Some(total),
1644        llm_parallelism: Some(args.llm_parallelism),
1645    });
1646
1647    // Dry-run: emit preview events and summary without calling LLM
1648    if args.dry_run {
1649        for (idx, key) in scan_result.iter().enumerate() {
1650            emit_json(&ItemEvent {
1651                item: key,
1652                status: "preview",
1653                memory_id: None,
1654                entity_id: None,
1655                entities: None,
1656                rels: None,
1657                chars_before: None,
1658                chars_after: None,
1659                cost_usd: None,
1660                elapsed_ms: None,
1661                error: None,
1662                index: idx,
1663                total,
1664            });
1665        }
1666        emit_json(&EnrichSummary {
1667            summary: true,
1668            operation: format!("{:?}", args.operation()),
1669            items_total: total,
1670            completed: 0,
1671            failed: 0,
1672            skipped: 0,
1673            cost_usd: 0.0,
1674            elapsed_ms: started.elapsed().as_millis() as u64,
1675            backend_invoked: take_enrich_backend(),
1676            waiting: 0,
1677            dead: 0,
1678        });
1679        return Ok(());
1680    }
1681
1682    // All operations in this enum have an execution path.
1683
1684    // Queue setup for resume/retry (GAP-SG-64: sidecar alongside --db)
1685    let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1686    let queue_conn = open_queue_db(&queue_path)?;
1687
1688    if args.resume {
1689        let reset = queue_conn
1690            .execute(
1691                "UPDATE queue SET status='pending' WHERE status='processing'",
1692                [],
1693            )
1694            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1695        if reset > 0 {
1696            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1697        }
1698    }
1699
1700    if args.retry_failed {
1701        let count = queue_conn
1702            .execute(
1703                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1704                [],
1705            )
1706            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1707        tracing::info!(target: "enrich", count, "retrying failed items");
1708    }
1709
1710    if !args.resume && !args.retry_failed && !args.until_empty {
1711        queue_conn
1712            .execute("DELETE FROM queue", [])
1713            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1714    }
1715
1716    // Populate queue (GAP-SG-12: tag rows with the operation + link memory_id).
1717    let op_label = format!("{:?}", args.operation());
1718    let item_type = item_type_for(&args.operation());
1719    for key in scan_result.iter() {
1720        enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1721    }
1722
1723    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1724    // Clamp enforces the range even if the caller bypasses clap validation.
1725    let parallelism = if args.mode() == EnrichMode::OpenRouter {
1726        let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
1727        tracing::info!(
1728            target: "enrich",
1729            concurrency = rest,
1730            source = "rest_concurrency",
1731            "OpenRouter REST concurrency (clamp 1..=16)"
1732        );
1733        rest
1734    } else {
1735        let p = args.llm_parallelism.clamp(1, 32) as usize;
1736        tracing::info!(
1737            target: "enrich",
1738            concurrency = p,
1739            source = "llm_parallelism",
1740            "LLM subprocess parallelism (clamp 1..=32)"
1741        );
1742        p
1743    };
1744    if parallelism > 1 {
1745        tracing::info!(
1746            target: "enrich",
1747            llm_parallelism = parallelism,
1748            "parallel LLM processing with bounded thread pool"
1749        );
1750    }
1751    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1752    // ceiling. The threshold and message depend on the LLM mode because
1753    // Claude Code spawns MCP children (G28-A) while Codex does not.
1754    if parallelism > 4 {
1755        match args.mode() {
1756            EnrichMode::ClaudeCode => {
1757                tracing::warn!(
1758                    target: "enrich",
1759                    llm_parallelism = parallelism,
1760                    recommended_max = 4,
1761                    mode = "claude-code",
1762                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1763                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1764                     to cut MCP children (G28-A)"
1765                );
1766            }
1767            EnrichMode::Codex if parallelism > 16 => {
1768                tracing::warn!(
1769                    target: "enrich",
1770                    llm_parallelism = parallelism,
1771                    recommended_max = 16,
1772                    mode = "codex",
1773                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1774                     consider --llm-parallelism 8 for safer concurrency"
1775                );
1776            }
1777            EnrichMode::Codex => {
1778                // No warning: codex does not spawn MCP children and was
1779                // validated at parallelism 8 in production (1161 items,
1780                // 0 failures) per the 2026-06-04 session audit.
1781            }
1782            EnrichMode::Opencode if parallelism > 16 => {
1783                tracing::warn!(
1784                    target: "enrich",
1785                    llm_parallelism = parallelism,
1786                    recommended_max = 16,
1787                    mode = "opencode",
1788                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1789                     consider --llm-parallelism 8 for safer concurrency"
1790                );
1791            }
1792            EnrichMode::Opencode => {
1793                // No warning: opencode does not spawn MCP children.
1794            }
1795            EnrichMode::OpenRouter => {
1796                // No warning: OpenRouter is a bounded HTTP fan-out (no
1797                // subprocess); --llm-parallelism is respected as-is.
1798            }
1799        }
1800    }
1801
1802    let mut completed = 0usize;
1803    let mut failed = 0usize;
1804    let mut skipped = 0usize;
1805    let mut cost_total = 0.0f64;
1806    let mut oauth_detected = false;
1807    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1808    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1809    let enrich_started = std::time::Instant::now();
1810
1811    let provider_timeout = match args.mode() {
1812        EnrichMode::ClaudeCode => args.claude_timeout,
1813        EnrichMode::Codex => args.codex_timeout,
1814        EnrichMode::Opencode => args.opencode_timeout,
1815        EnrichMode::OpenRouter => args.openrouter_timeout,
1816    };
1817
1818    let provider_model: Option<&str> = match args.mode() {
1819        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1820        EnrichMode::Codex => args.codex_model.as_deref(),
1821        EnrichMode::Opencode => args.opencode_model.as_deref(),
1822        EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
1823    };
1824
1825    // GAP-SG-16: when --ignore-backoff is set, drop the per-item cooldown filter
1826    // from candidate selection so items parked on `next_retry_at` are eligible
1827    // immediately. Shared by the parallel workers and the serial loop.
1828    let backoff_clause: &str = if args.ignore_backoff {
1829        ""
1830    } else {
1831        "AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))"
1832    };
1833
1834    // GAP-SG-45: announce the scan-vs-drain concurrency split (scan is always
1835    // serial; drain uses `parallelism` workers).
1836    emit_json(&ConcurrencyEvent {
1837        phase: "concurrency",
1838        scan_parallelism: 1,
1839        drain_parallelism: parallelism as u32,
1840    });
1841
1842    // GAP-ENRICH-BACKLOG-CONVERGE: --until-empty wraps the scan→populate→drain
1843    // cycle in an internal loop so the external bash retry loop is unnecessary.
1844    // Without --until-empty the loop body runs exactly once (legacy behaviour).
1845    let until_deadline = std::time::Instant::now()
1846        + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
1847    loop {
1848        if args.until_empty {
1849            // Re-scan and re-enqueue eligible candidates each iteration.
1850            // INSERT OR IGNORE never resurrects a dead-letter row (item_key is
1851            // UNIQUE), so the backlog converges instead of looping forever.
1852            let mut rescan = scan_operation(&conn, &namespace, args)?;
1853            // GAP-SG-69: drop memories already vetoed `status='skipped'` so the
1854            // re-scan converges instead of re-enqueuing a non-expandable short
1855            // body every iteration (body-enrich only; the verdict persists in
1856            // the sidecar queue and is cleared by cleanup_queue_entry on edit).
1857            if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1858                if let Ok(vetoed) = skipped_item_keys(&queue_conn, &op_label) {
1859                    rescan.retain(|k| !vetoed.contains(k));
1860                }
1861            }
1862            for key in &rescan {
1863                enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1864            }
1865        }
1866        let completed_before = completed;
1867
1868        // G19: when parallelism > 1, spawn bounded worker threads.
1869        // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1870        // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1871        if parallelism > 1 {
1872            let stdout_mu = parking_lot::Mutex::new(());
1873            let budget = args.max_cost_usd;
1874            let operation = args.operation().clone();
1875            let mode = args.mode().clone();
1876            let min_oc = args.min_output_chars;
1877            let max_oc = args.max_output_chars;
1878            let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1879
1880            struct WorkerResult {
1881                completed: usize,
1882                failed: usize,
1883                skipped: usize,
1884                cost: f64,
1885                oauth: bool,
1886            }
1887
1888            let results: Vec<WorkerResult> = std::thread::scope(|s| {
1889                let handles: Vec<_> = (0..parallelism)
1890                .map(|worker_id| {
1891                    let stdout_mu = &stdout_mu;
1892                    let paths = &paths;
1893                    let queue_path = &queue_path;
1894                    let namespace = &namespace;
1895                    let provider_binary = provider_binary.as_deref();
1896                    let operation = &operation;
1897                    let mode = &mode;
1898                    let prompt_tpl = prompt_tpl.as_deref();
1899                    s.spawn(move || {
1900                        let w_conn = match open_rw(&paths.db) {
1901                            Ok(c) => c,
1902                            Err(e) => {
1903                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1904                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1905                            }
1906                        };
1907                        let w_queue = match open_queue_db(queue_path) {
1908                            Ok(c) => c,
1909                            Err(e) => {
1910                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1911                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1912                            }
1913                        };
1914                        let mut w_completed = 0usize;
1915                        let mut w_failed = 0usize;
1916                        let mut w_skipped = 0usize;
1917                        let mut w_cost = 0.0f64;
1918                        let mut w_oauth = false;
1919                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1920                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1921                        // G28-D: per-worker circuit breaker that aborts the
1922                        // loop after `circuit_breaker_threshold` consecutive
1923                        // HardFailure outcomes (transient/rate-limited errors
1924                        // do NOT count, so a recovering provider is not
1925                        // penalised).
1926                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1927                            args.circuit_breaker_threshold.max(1),
1928                            std::time::Duration::from_secs(60),
1929                        );
1930
1931                        loop {
1932                            if crate::shutdown_requested() {
1933                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1934                                break;
1935                            }
1936                            if let Some(b) = budget {
1937                                if !w_oauth && w_cost >= b {
1938                                    break;
1939                                }
1940                            }
1941                            // GAP-SG-16: --ignore-backoff drops the next_retry_at
1942                            // cooldown filter so items waiting on backoff are
1943                            // claimed immediately.
1944                            let dequeue_sql = format!(
1945                                "UPDATE queue SET status='processing', attempt=attempt+1 \
1946                                 WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
1947                                             ORDER BY id LIMIT 1) \
1948                                 RETURNING id, item_key, item_type, attempt"
1949                            );
1950                            let pending: Option<(i64, String, String, i64)> = w_queue
1951                                .query_row(
1952                                    &dequeue_sql,
1953                                    [],
1954                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1955                                )
1956                                .ok();
1957                            let (queue_id, item_key, _item_type, attempt_current) = match pending {
1958                                Some(p) => p,
1959                                None => break,
1960                            };
1961                            let item_started = Instant::now();
1962                            let current_index = w_completed + w_failed + w_skipped;
1963
1964                            // provider_binary validated upfront (Some for every
1965                            // LLM-backed op; None only for ReEmbed, which ignores
1966                            // it). unwrap_or_default yields "" solely in that
1967                            // unread case, so a broken invariant surfaces as a
1968                            // recoverable per-item error instead of a panic.
1969                            let provider_bin = provider_binary.unwrap_or_else(|| std::path::Path::new(""));
1970                            let call_result = match operation {
1971                                EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1972                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1973                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
1974                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
1975                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1976                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1977                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1978                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1979                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1980                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1981                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1982                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1983                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, args.body_extract_graph_only),
1984                            };
1985
1986                            match call_result {
1987                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1988                                    if is_oauth { w_oauth = true; }
1989                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1990                                    let _ = w_queue.execute(
1991                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1992                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1993                                    );
1994                                    w_completed += 1;
1995                                    if !is_oauth { w_cost += cost; }
1996                                    // G28-D: count success; resets breaker.
1997                                    let _ = w_breaker
1998                                        .record(crate::retry::AttemptOutcome::Success);
1999                                    let _guard = stdout_mu.lock();
2000                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2001                                }
2002                                Ok(EnrichItemResult::Skipped { reason }) => {
2003                                    w_skipped += 1;
2004                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2005                                    let _guard = stdout_mu.lock();
2006                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2007                                }
2008                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2009                                    // G29 Passo 4: worker mirror of the
2010                                    // serial path. Counted as a soft
2011                                    // skip so the queue surface shows
2012                                    // a quality issue rather than a
2013                                    // transport failure.
2014                                    w_skipped += 1;
2015                                    let reason = format!(
2016                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2017                                    );
2018                                    let _ = w_queue.execute(
2019                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2020                                        rusqlite::params![reason, queue_id],
2021                                    );
2022                                    let _guard = stdout_mu.lock();
2023                                    emit_json(&ItemEvent {
2024                                        item: &item_key,
2025                                        status: "preservation_failed",
2026                                        memory_id: None,
2027                                        entity_id: None,
2028                                        entities: None,
2029                                        rels: None,
2030                                        chars_before: Some(chars_before),
2031                                        chars_after: Some(chars_after),
2032                                        cost_usd: None,
2033                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2034                                        error: Some(reason),
2035                                        index: current_index,
2036                                        total,
2037                                    });
2038                                }
2039                                Err(e) => {
2040                                    let err_str = format!("{e}");
2041                                    if matches!(e, AppError::RateLimited { .. }) {
2042                                        if crate::retry::is_kill_switch_active() {
2043                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2044                                        } else if std::time::Instant::now() >= w_deadline {
2045                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2046                                        } else {
2047                                            let half = w_backoff / 2;
2048                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2049                                            let actual_wait = half + jitter;
2050                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2051                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2052                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2053                                            w_backoff = (w_backoff * 2).min(900);
2054                                            continue;
2055                                        }
2056                                    }
2057                                    w_failed += 1;
2058                                    let outcome = record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e);
2059                                    let _guard = stdout_mu.lock();
2060                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2061                                    // G28-D: feed the classified outcome to the breaker (transient
2062                                    // failures do not count toward opening it).
2063                                    let breaker_opened = w_breaker.record(outcome);
2064                                    if breaker_opened {
2065                                        tracing::error!(target: "enrich",
2066                                            consecutive_failures = w_breaker.consecutive_failures(),
2067                                            "circuit breaker opened — aborting worker"
2068                                        );
2069                                        break;
2070                                    }
2071                                }
2072                            }
2073                        }
2074                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2075                    })
2076                })
2077                .collect();
2078                handles
2079                    .into_iter()
2080                    .map(|h| {
2081                        h.join().unwrap_or(WorkerResult {
2082                            completed: 0,
2083                            failed: 0,
2084                            skipped: 0,
2085                            cost: 0.0,
2086                            oauth: false,
2087                        })
2088                    })
2089                    .collect()
2090            });
2091
2092            for r in &results {
2093                completed += r.completed;
2094                failed += r.failed;
2095                skipped += r.skipped;
2096                cost_total += r.cost;
2097                if r.oauth && !oauth_detected {
2098                    oauth_detected = true;
2099                }
2100            }
2101        } else {
2102            // Serial path (parallelism == 1) — original loop
2103            loop {
2104                if crate::shutdown_requested() {
2105                    tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2106                    break;
2107                }
2108
2109                // Budget check
2110                if let Some(budget) = args.max_cost_usd {
2111                    if !oauth_detected && cost_total >= budget {
2112                        tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2113                        break;
2114                    }
2115                }
2116
2117                // Dequeue next pending item (GAP-SG-16: --ignore-backoff drops
2118                // the next_retry_at cooldown filter).
2119                let dequeue_sql = format!(
2120                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2121                     WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
2122                                 ORDER BY id LIMIT 1) \
2123                     RETURNING id, item_key, item_type, attempt"
2124                );
2125                let pending: Option<(i64, String, String, i64)> = queue_conn
2126                    .query_row(&dequeue_sql, [], |row| {
2127                        Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
2128                    })
2129                    .ok();
2130
2131                let (queue_id, item_key, item_type, attempt_current) = match pending {
2132                    Some(p) => p,
2133                    None => break,
2134                };
2135
2136                let item_started = Instant::now();
2137                let current_index = completed + failed + skipped;
2138
2139                // See worker note: provider_binary is Some for every LLM-backed
2140                // op; "" here only for ReEmbed, which never reads it.
2141                let provider_bin = provider_binary
2142                    .as_deref()
2143                    .unwrap_or_else(|| std::path::Path::new(""));
2144                let call_result = match args.operation() {
2145                    EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => {
2146                        call_memory_bindings(
2147                            &conn,
2148                            &namespace,
2149                            &item_key,
2150                            provider_bin,
2151                            provider_model,
2152                            provider_timeout,
2153                            &args.mode(),
2154                        )
2155                    }
2156                    EnrichOperation::EntityDescriptions => call_entity_description(
2157                        &conn,
2158                        &namespace,
2159                        &item_key,
2160                        provider_bin,
2161                        provider_model,
2162                        provider_timeout,
2163                        &args.mode(),
2164                    ),
2165                    EnrichOperation::BodyEnrich => call_body_enrich(
2166                        &conn,
2167                        &namespace,
2168                        &item_key,
2169                        provider_bin,
2170                        provider_model,
2171                        provider_timeout,
2172                        &args.mode(),
2173                        args.min_output_chars,
2174                        args.max_output_chars,
2175                        args.prompt_template.as_deref(),
2176                        args.preserve_threshold,
2177                        &paths,
2178                        llm_backend,
2179                        embedding_backend,
2180                    ),
2181                    EnrichOperation::ReEmbed => call_reembed(
2182                        &conn,
2183                        &namespace,
2184                        &item_key,
2185                        &paths,
2186                        llm_backend,
2187                        embedding_backend,
2188                    ),
2189                    EnrichOperation::WeightCalibrate => call_weight_calibrate(
2190                        &conn,
2191                        &namespace,
2192                        &item_key,
2193                        provider_bin,
2194                        provider_model,
2195                        provider_timeout,
2196                        &args.mode(),
2197                    ),
2198                    EnrichOperation::RelationReclassify => call_relation_reclassify(
2199                        &conn,
2200                        &namespace,
2201                        &item_key,
2202                        provider_bin,
2203                        provider_model,
2204                        provider_timeout,
2205                        &args.mode(),
2206                    ),
2207                    EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2208                        call_entity_connect(
2209                            &conn,
2210                            &namespace,
2211                            &item_key,
2212                            provider_bin,
2213                            provider_model,
2214                            provider_timeout,
2215                            &args.mode(),
2216                        )
2217                    }
2218                    EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2219                        &conn,
2220                        &namespace,
2221                        &item_key,
2222                        provider_bin,
2223                        provider_model,
2224                        provider_timeout,
2225                        &args.mode(),
2226                    ),
2227                    EnrichOperation::DescriptionEnrich => call_description_enrich(
2228                        &conn,
2229                        &namespace,
2230                        &item_key,
2231                        provider_bin,
2232                        provider_model,
2233                        provider_timeout,
2234                        &args.mode(),
2235                    ),
2236                    EnrichOperation::DomainClassify => call_domain_classify(
2237                        &conn,
2238                        &namespace,
2239                        &item_key,
2240                        provider_bin,
2241                        provider_model,
2242                        provider_timeout,
2243                        &args.mode(),
2244                    ),
2245                    EnrichOperation::GraphAudit => call_graph_audit(
2246                        &conn,
2247                        &namespace,
2248                        &item_key,
2249                        provider_bin,
2250                        provider_model,
2251                        provider_timeout,
2252                        &args.mode(),
2253                    ),
2254                    EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2255                        &conn,
2256                        &namespace,
2257                        &item_key,
2258                        provider_bin,
2259                        provider_model,
2260                        provider_timeout,
2261                        &args.mode(),
2262                    ),
2263                    EnrichOperation::BodyExtract => call_body_extract(
2264                        &conn,
2265                        &namespace,
2266                        &item_key,
2267                        provider_bin,
2268                        provider_model,
2269                        provider_timeout,
2270                        &args.mode(),
2271                        args.body_extract_graph_only,
2272                    ),
2273                };
2274
2275                match call_result {
2276                    Ok(EnrichItemResult::Done {
2277                        memory_id,
2278                        entity_id,
2279                        entities,
2280                        rels,
2281                        chars_before,
2282                        chars_after,
2283                        cost,
2284                        is_oauth,
2285                    }) => {
2286                        if is_oauth && !oauth_detected {
2287                            oauth_detected = true;
2288                            tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2289                        }
2290                        backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2291
2292                        // Persist depends on the operation
2293                        let persist_err: Option<String> = match args.operation() {
2294                            EnrichOperation::MemoryBindings => {
2295                                // Bindings already persisted inside call_memory_bindings
2296                                None
2297                            }
2298                            EnrichOperation::EntityDescriptions => {
2299                                // Description already persisted inside call_entity_description
2300                                None
2301                            }
2302                            EnrichOperation::BodyEnrich => {
2303                                // Body already persisted inside call_body_enrich
2304                                None
2305                            }
2306                            _ => {
2307                                // All G27 operations persist inside their call_* function
2308                                None
2309                            }
2310                        };
2311
2312                        if let Err(e) = queue_conn.execute(
2313                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2314                    rusqlite::params![
2315                        memory_id,
2316                        entity_id,
2317                        entities as i64,
2318                        rels as i64,
2319                        cost,
2320                        item_started.elapsed().as_millis() as i64,
2321                        queue_id
2322                    ],
2323                ) {
2324                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2325                    }
2326
2327                        if persist_err.is_none() {
2328                            completed += 1;
2329                            if !is_oauth {
2330                                cost_total += cost;
2331                            }
2332                            emit_json(&ItemEvent {
2333                                item: &item_key,
2334                                status: "done",
2335                                memory_id,
2336                                entity_id,
2337                                entities: Some(entities),
2338                                rels: Some(rels),
2339                                chars_before,
2340                                chars_after,
2341                                cost_usd: if is_oauth { None } else { Some(cost) },
2342                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2343                                error: None,
2344                                index: current_index,
2345                                total,
2346                            });
2347                        } else {
2348                            failed += 1;
2349                            emit_json(&ItemEvent {
2350                                item: &item_key,
2351                                status: "failed",
2352                                memory_id: None,
2353                                entity_id: None,
2354                                entities: None,
2355                                rels: None,
2356                                chars_before: None,
2357                                chars_after: None,
2358                                cost_usd: None,
2359                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2360                                error: persist_err,
2361                                index: current_index,
2362                                total,
2363                            });
2364                        }
2365                    }
2366                    Ok(EnrichItemResult::Skipped { reason }) => {
2367                        skipped += 1;
2368                        if let Err(e) = queue_conn.execute(
2369                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2370                    rusqlite::params![reason, queue_id],
2371                ) {
2372                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2373                    }
2374                        emit_json(&ItemEvent {
2375                            item: &item_key,
2376                            status: "skipped",
2377                            memory_id: None,
2378                            entity_id: None,
2379                            entities: None,
2380                            rels: None,
2381                            chars_before: None,
2382                            chars_after: None,
2383                            cost_usd: None,
2384                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2385                            error: None,
2386                            index: current_index,
2387                            total,
2388                        });
2389                    }
2390                    Ok(EnrichItemResult::PreservationFailed {
2391                        score,
2392                        threshold,
2393                        chars_before,
2394                        chars_after,
2395                    }) => {
2396                        // G29 Passo 4: the LLM rewrite diverged too far from
2397                        // the original body. Count as a soft failure (not
2398                        // `failed`) so the queue surfaces it as a quality
2399                        // issue, not a transport error. The reason is
2400                        // structured so the operator can audit why a body
2401                        // was rejected.
2402                        skipped += 1;
2403                        let reason = format!(
2404                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2405                    );
2406                        if let Err(qe) = queue_conn.execute(
2407                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2408                        rusqlite::params![reason, queue_id],
2409                    ) {
2410                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2411                    }
2412                        emit_json(&ItemEvent {
2413                            item: &item_key,
2414                            status: "preservation_failed",
2415                            memory_id: None,
2416                            entity_id: None,
2417                            entities: None,
2418                            rels: None,
2419                            chars_before: Some(chars_before),
2420                            chars_after: Some(chars_after),
2421                            cost_usd: None,
2422                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2423                            error: Some(reason),
2424                            index: current_index,
2425                            total,
2426                        });
2427                    }
2428                    Err(e) => {
2429                        let err_str = format!("{e}");
2430                        if matches!(e, AppError::RateLimited { .. }) {
2431                            if crate::retry::is_kill_switch_active() {
2432                                tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2433                            } else if std::time::Instant::now() >= rate_limit_deadline {
2434                                tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2435                            } else {
2436                                let half = backoff_secs / 2;
2437                                let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2438                                let actual_wait = half + jitter;
2439                                tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2440                                if let Err(qe) = queue_conn.execute(
2441                                    "UPDATE queue SET status='pending' WHERE id=?1",
2442                                    rusqlite::params![queue_id],
2443                                ) {
2444                                    tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2445                                }
2446                                std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2447                                backoff_secs = (backoff_secs * 2).min(900);
2448                                continue;
2449                            }
2450                        }
2451
2452                        failed += 1;
2453                        let _outcome = record_item_failure(
2454                            &queue_conn,
2455                            queue_id,
2456                            attempt_current,
2457                            args.max_attempts,
2458                            &e,
2459                        );
2460                        emit_json(&ItemEvent {
2461                            item: &item_key,
2462                            status: "failed",
2463                            memory_id: None,
2464                            entity_id: None,
2465                            entities: None,
2466                            rels: None,
2467                            chars_before: None,
2468                            chars_after: None,
2469                            cost_usd: None,
2470                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2471                            error: Some(err_str),
2472                            index: current_index,
2473                            total,
2474                        });
2475                    }
2476                }
2477
2478                let _ = item_type; // used via queue schema only
2479            }
2480        } // end else (serial path)
2481
2482        if !args.until_empty {
2483            break;
2484        }
2485        let eligible_remaining: i64 = queue_conn
2486            .query_row(
2487                &format!("SELECT COUNT(*) FROM queue WHERE status='pending' {backoff_clause}"),
2488                [],
2489                |r| r.get(0),
2490            )
2491            .unwrap_or(0);
2492        let progressed = completed > completed_before;
2493        if std::time::Instant::now() >= until_deadline {
2494            tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
2495            break;
2496        }
2497        if !progressed && eligible_remaining == 0 {
2498            tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
2499            break;
2500        }
2501        if eligible_remaining == 0 {
2502            // Remaining pending items are waiting on backoff; nap and re-check.
2503            std::thread::sleep(std::time::Duration::from_secs(1));
2504        }
2505    } // end until-empty loop
2506
2507    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2508    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2509
2510    // GAP-SG-15: report items still in cooldown (waiting) and dead-lettered
2511    // alongside completed, so `--until-empty` makes the convergence state
2512    // explicit (cooldown vs. dead vs. truly empty) instead of just "done".
2513    let waiting_final: i64 = queue_conn
2514        .query_row(
2515            "SELECT COUNT(*) FROM queue WHERE status='pending' \
2516             AND (operation = ?1 OR operation IS NULL) \
2517             AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
2518            rusqlite::params![op_label],
2519            |r| r.get(0),
2520        )
2521        .unwrap_or(0);
2522    let dead_final: i64 = queue_conn
2523        .query_row(
2524            "SELECT COUNT(*) FROM queue WHERE status='dead' \
2525             AND (operation = ?1 OR operation IS NULL)",
2526            rusqlite::params![op_label],
2527            |r| r.get(0),
2528        )
2529        .unwrap_or(0);
2530
2531    emit_json(&EnrichSummary {
2532        summary: true,
2533        operation: format!("{:?}", args.operation()),
2534        items_total: total,
2535        completed,
2536        failed,
2537        skipped,
2538        cost_usd: cost_total,
2539        elapsed_ms: started.elapsed().as_millis() as u64,
2540        backend_invoked: take_enrich_backend(),
2541        waiting: waiting_final,
2542        dead: dead_final,
2543    });
2544
2545    if failed == 0 {
2546        // GAP-ENRICH-BACKLOG-CONVERGE: keep the queue file when dead-letter rows
2547        // exist so `enrich --status` can still report them on the next run.
2548        let dead: i64 = queue_conn
2549            .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
2550                r.get(0)
2551            })
2552            .unwrap_or(0);
2553        // GAP-SG-69: keep the sidecar queue while it still holds `skipped`
2554        // verdicts. Those rows tell the next scan which short bodies are
2555        // non-expandable; removing the file would lose the veto and the
2556        // body-enrich backlog would never converge. cleanup_queue_entry clears
2557        // a row when its memory is edited/forgotten, so the veto is not permanent.
2558        let skipped_remaining: i64 = queue_conn
2559            .query_row(
2560                "SELECT COUNT(*) FROM queue WHERE status='skipped'",
2561                [],
2562                |r| r.get(0),
2563            )
2564            .unwrap_or(0);
2565        if dead == 0 && skipped_remaining == 0 {
2566            let _ = std::fs::remove_file(&queue_path);
2567        }
2568    }
2569
2570    Ok(())
2571}
2572
2573// EnrichItemResult + call_* functions moved to extraction.rs
2574
2575// ---------------------------------------------------------------------------
2576// Tests
2577// ---------------------------------------------------------------------------
2578
2579#[cfg(test)]
2580mod tests {
2581    use super::*;
2582
2583    #[test]
2584    fn bindings_schema_is_valid_json() {
2585        let _: serde_json::Value =
2586            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2587    }
2588
2589    #[test]
2590    fn entity_description_schema_is_valid_json() {
2591        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2592            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2593    }
2594
2595    #[test]
2596    fn body_enrich_schema_is_valid_json() {
2597        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2598            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2599    }
2600}