Skip to main content

sqlite_graphrag/commands/
enrich.rs

1// TODO v1.0.89: este arquivo tem 4116 linhas — modularização planejada.
2// Ver ADR-0046 seção "Known Tech Debt (v1.0.89+)".
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY opportunity
19//!
20//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
21//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
22//! here verbatim. A future refactoring could extract them into a shared
23//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
24//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
25//! this stream's boundary — flagged here for the Integration stream to evaluate.
26
27use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42// ---------------------------------------------------------------------------
43// Constants
44// ---------------------------------------------------------------------------
45
46const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51// ---------------------------------------------------------------------------
52// JSON schema used for memory-bindings and body-enrich extraction
53// ---------------------------------------------------------------------------
54
55const BINDINGS_SCHEMA: &str = r#"{
56  "type": "object",
57  "properties": {
58    "entities": {
59      "type": "array",
60      "items": {
61        "type": "object",
62        "properties": {
63          "name": { "type": "string" },
64          "entity_type": {
65            "type": "string",
66            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67          }
68        },
69        "required": ["name", "entity_type"],
70        "additionalProperties": false
71      }
72    },
73    "relationships": {
74      "type": "array",
75      "items": {
76        "type": "object",
77        "properties": {
78          "source": { "type": "string" },
79          "target": { "type": "string" },
80          "relation": {
81            "type": "string",
82            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83          },
84          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85        },
86        "required": ["source","target","relation","strength"],
87        "additionalProperties": false
88      }
89    }
90  },
91  "required": ["entities","relationships"],
92  "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96  "type": "object",
97  "properties": {
98    "description": { "type": "string" }
99  },
100  "required": ["description"],
101  "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105  "type": "object",
106  "properties": {
107    "enriched_body": { "type": "string" }
108  },
109  "required": ["enriched_body"],
110  "additionalProperties": false
111}"#;
112
113// G27 P1: weight-calibrate
114const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123  "type": "object",
124  "properties": {
125    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126    "reasoning": { "type": "string" }
127  },
128  "required": ["calibrated_weight", "reasoning"],
129  "additionalProperties": false
130}"#;
131
132// G27 P1: relation-reclassify
133const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149  "type": "object",
150  "properties": {
151    "relation": { "type": "string" },
152    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153    "reasoning": { "type": "string" }
154  },
155  "required": ["relation", "strength", "reasoning"],
156  "additionalProperties": false
157}"#;
158
159// G27 P2: entity-connect — suggest relationships between isolated entities
160const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166  "type": "object",
167  "properties": {
168    "relation": { "type": "string" },
169    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170    "reasoning": { "type": "string" }
171  },
172  "required": ["relation", "strength", "reasoning"],
173  "additionalProperties": false
174}"#;
175
176// G27 P2: entity-type-validate — verify entity type assignments
177const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183  "type": "object",
184  "properties": {
185    "validated_type": { "type": "string" },
186    "was_correct": { "type": "boolean" },
187    "reasoning": { "type": "string" }
188  },
189  "required": ["validated_type", "was_correct", "reasoning"],
190  "additionalProperties": false
191}"#;
192
193// G27 P2: description-enrich — improve generic memory descriptions
194const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200  "type": "object",
201  "properties": {
202    "description": { "type": "string" },
203    "reasoning": { "type": "string" }
204  },
205  "required": ["description", "reasoning"],
206  "additionalProperties": false
207}"#;
208
209// G27 P2: domain-classify — classify memory into domain category
210const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214  "type": "object",
215  "properties": {
216    "domain": { "type": "string" },
217    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218    "reasoning": { "type": "string" }
219  },
220  "required": ["domain", "confidence", "reasoning"],
221  "additionalProperties": false
222}"#;
223
224// G27 P2: graph-audit — audit graph for quality issues
225const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230  "type": "object",
231  "properties": {
232    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234    "reasoning": { "type": "string" }
235  },
236  "required": ["quality_score", "issues", "reasoning"],
237  "additionalProperties": false
238}"#;
239
240// G27 P2: deep-research-synth — synthesize research findings into graph
241const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247  "type": "object",
248  "properties": {
249    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251    "summary": { "type": "string" }
252  },
253  "required": ["entities", "relationships", "summary"],
254  "additionalProperties": false
255}"#;
256
257// G27 P2: body-extract — extract structured content from unstructured text
258const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263  "type": "object",
264  "properties": {
265    "restructured_body": { "type": "string" },
266    "changes_summary": { "type": "string" }
267  },
268  "required": ["restructured_body", "changes_summary"],
269  "additionalProperties": false
270}"#;
271
272// ---------------------------------------------------------------------------
273// Prompts
274// ---------------------------------------------------------------------------
275
276const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291// ---------------------------------------------------------------------------
292// CLI args
293// ---------------------------------------------------------------------------
294
295/// Operation to perform in the `enrich` command.
296#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299    /// Add missing entity/relationship bindings to memories (fully implemented).
300    MemoryBindings,
301    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
302    EntityDescriptions,
303    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
304    BodyEnrich,
305    /// Rebuild missing memory embeddings without rewriting the memory body.
306    ReEmbed,
307    /// Calibrate relationship weights using LLM analysis (scan only).
308    WeightCalibrate,
309    /// Reclassify relationship types using LLM judgment (scan only).
310    RelationReclassify,
311    /// Connect isolated entities by suggesting new relationships (scan only).
312    EntityConnect,
313    /// Validate entity type assignments using LLM judgment (scan only).
314    EntityTypeValidate,
315    /// Enrich memory descriptions that are generic/auto-generated (scan only).
316    DescriptionEnrich,
317    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
318    CrossDomainBridges,
319    /// Classify memories into domain categories (scan only).
320    DomainClassify,
321    /// Audit the graph for quality issues (scan only).
322    GraphAudit,
323    /// Synthesize deep-research findings into graph memories (scan only).
324    DeepResearchSynth,
325    /// Extract structured body from unstructured text (scan only).
326    BodyExtract,
327}
328
329/// LLM provider for enrichment.
330#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332    /// Use locally installed Claude Code CLI (OAuth-first).
333    ClaudeCode,
334    /// Use locally installed OpenAI Codex CLI.
335    Codex,
336    /// Use locally installed OpenCode CLI.
337    #[value(name = "opencode")]
338    Opencode,
339    /// Use the OpenRouter chat-completions REST API (no local CLI; v1.0.95).
340    #[value(name = "openrouter")]
341    OpenRouter,
342}
343
344impl std::fmt::Display for EnrichMode {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        match self {
347            EnrichMode::ClaudeCode => write!(f, "claude-code"),
348            EnrichMode::Codex => write!(f, "codex"),
349            EnrichMode::Opencode => write!(f, "opencode"),
350            EnrichMode::OpenRouter => write!(f, "openrouter"),
351        }
352    }
353}
354
355/// Arguments for the `enrich` subcommand.
356#[derive(clap::Args)]
357#[command(
358    about = "Enrich graph memories and entities using an LLM provider",
359    after_long_help = "EXAMPLES:\n  \
360    # Add missing entity bindings to all unbound memories\n  \
361    sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n  \
362    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
363    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
364    # Expand short memory bodies (GAP-18)\n  \
365    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
366    # Rebuild only missing memory embeddings without rewriting bodies\n  \
367    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
368    # Resume an interrupted body-enrich run\n  \
369    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
370    # Retry only failed items from a previous run\n  \
371    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
372    EXIT CODES:\n  \
373    0  success\n  \
374    1  validation error (bad args, binary not found)\n  \
375    14 I/O error"
376)]
377pub struct EnrichArgs {
378    /// Enrichment operation to run.
379    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
380    pub operation: EnrichOperation,
381
382    /// LLM provider to use. Required (no default).
383    #[arg(long, value_enum)]
384    pub mode: EnrichMode,
385
386    /// Maximum number of items to process in this run. Omit for all.
387    #[arg(long, value_name = "N")]
388    pub limit: Option<usize>,
389
390    /// Preview items without calling the LLM (zero tokens consumed).
391    #[arg(long)]
392    pub dry_run: bool,
393
394    /// Namespace to operate on. Default: global.
395    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
396    pub namespace: Option<String>,
397
398    // -- Provider flags (Claude) --
399    /// Path to the Claude Code binary. Default: auto-detect from PATH.
400    #[arg(long, value_name = "PATH")]
401    pub claude_binary: Option<PathBuf>,
402
403    /// Claude model to use (e.g. claude-sonnet-4-6).
404    #[arg(long, value_name = "MODEL")]
405    pub claude_model: Option<String>,
406
407    /// Timeout per item in seconds when using Claude Code. Default: 300.
408    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
409    pub claude_timeout: u64,
410
411    // -- Provider flags (Codex) --
412    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
413    #[arg(long, value_name = "PATH")]
414    pub codex_binary: Option<PathBuf>,
415
416    /// Codex model to use (e.g. o4-mini).
417    #[arg(long, value_name = "MODEL")]
418    pub codex_model: Option<String>,
419
420    /// Timeout per item in seconds when using Codex. Default: 300.
421    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
422    pub codex_timeout: u64,
423
424    // -- Provider flags (OpenCode) --
425    /// Path to the OpenCode binary. Default: auto-detect from PATH.
426    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
427    pub opencode_binary: Option<PathBuf>,
428
429    /// OpenCode model to use.
430    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
431    pub opencode_model: Option<String>,
432
433    /// Timeout per item in seconds when using OpenCode. Default: 300.
434    #[arg(
435        long,
436        value_name = "SECONDS",
437        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
438        default_value_t = 300
439    )]
440    pub opencode_timeout: u64,
441
442    // -- Provider flags (OpenRouter, v1.0.95) --
443    /// OpenRouter text model to use (REQUIRED with --mode openrouter; no default).
444    #[arg(long, value_name = "MODEL")]
445    pub openrouter_model: Option<String>,
446
447    /// OpenRouter API key. Falls back to OPENROUTER_API_KEY env or stored config.
448    #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
449    pub openrouter_api_key: Option<String>,
450
451    /// Timeout per item in seconds when using OpenRouter. Default: 300.
452    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
453    pub openrouter_timeout: u64,
454
455    /// Optional OpenRouter base URL override (reserved; defaults to the public API).
456    #[arg(long, value_name = "URL")]
457    pub openrouter_base_url: Option<String>,
458
459    // -- Cost controls --
460    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
461    #[arg(long, value_name = "USD")]
462    pub max_cost_usd: Option<f64>,
463
464    // -- Queue controls --
465    /// Resume a previously interrupted run (skip already-done items).
466    #[arg(long)]
467    pub resume: bool,
468
469    /// Retry only items that failed in a previous run.
470    #[arg(long)]
471    pub retry_failed: bool,
472
473    /// GAP-ENRICH-BACKLOG-CONVERGE: loop scan→drain internally until the queue
474    /// empties of eligible items or --max-runtime elapses; removes the need for
475    /// an external bash retry loop.
476    #[arg(long)]
477    pub until_empty: bool,
478
479    /// GAP-ENRICH-BACKLOG-CONVERGE: wall-clock ceiling in seconds for
480    /// --until-empty. Defaults to 3600 when omitted.
481    #[arg(long, value_name = "SECONDS")]
482    pub max_runtime: Option<u64>,
483
484    /// GAP-ENRICH-BACKLOG-CONVERGE: attempts per item before it becomes a
485    /// dead-letter (status='dead'). Range 1..=20. Default 5.
486    #[arg(long, value_name = "N", default_value_t = 5, value_parser = clap::value_parser!(u32).range(1..=20))]
487    pub max_attempts: u32,
488
489    /// GAP-ENRICH-BACKLOG-CONVERGE: read-only mode — report queue and backlog
490    /// counts without calling the LLM or acquiring the singleton.
491    #[arg(long)]
492    pub status: bool,
493
494    /// GAP-ENRICH-BACKLOG-CONVERGE: REST concurrency for --mode openrouter
495    /// (clamp 1..=16, default 8). Distinct from the legacy --llm-parallelism.
496    #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
497    pub rest_concurrency: Option<u32>,
498
499    // -- body-enrich specific flags (GAP-18) --
500    /// Minimum output character count for body-enrich. Default: 500.
501    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
502    pub min_output_chars: usize,
503
504    /// Maximum output character count for body-enrich. Default: 2000.
505    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
506    pub max_output_chars: usize,
507
508    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
509    #[arg(long, default_value_t = true)]
510    pub preserve_check: bool,
511
512    /// Path to a custom prompt template file for body-enrich.
513    #[arg(long, value_name = "PATH")]
514    pub prompt_template: Option<PathBuf>,
515
516    /// Number of parallel LLM workers (default 1 = serial).
517    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
518    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
519    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
520    pub llm_parallelism: u32,
521
522    // -- Output / infra --
523    /// Emit NDJSON output. Always true; flag accepted for compatibility.
524    #[arg(long)]
525    pub json: bool,
526
527    /// Database path override.
528    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
529    pub db: Option<String>,
530
531    /// G30: poll for the job singleton every second for up to N seconds
532    /// when another invocation holds the lock. Default: 0 (fail fast).
533    #[arg(long, value_name = "SECONDS")]
534    pub wait_job_singleton: Option<u64>,
535
536    /// G30: force acquisition of the singleton lock by removing a stale
537    /// lock file from a previously crashed invocation. Use only when you
538    /// are certain no other `enrich`/`ingest` is running.
539    #[arg(long, default_value_t = false)]
540    pub force_job_singleton: bool,
541
542    /// G37: select a specific subset of memory names to enrich instead of
543    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
544    /// Empty when omitted (processes all candidates).
545    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
546    pub names: Vec<String>,
547
548    /// G37: read the subset of memory names from a file (one per line).
549    /// Lines starting with `#` and empty lines are ignored. Combined with
550    /// `--names` (union) when both are set.
551    #[arg(long, value_name = "PATH")]
552    pub names_file: Option<PathBuf>,
553
554    /// G35: probe the LLM provider with a 1-turn ping before processing
555    /// the batch. Aborts with a clear error if the rate-limit window is
556    /// closed (avoids burning N turns only to fail on item 1).
557    #[arg(long, default_value_t = false)]
558    pub preflight_check: bool,
559
560    /// G35: if a preflight probe or in-flight call hits the Claude rate
561    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
562    /// of failing the batch. Ignored when `--mode` is already `codex`.
563    #[arg(long, value_enum)]
564    pub fallback_mode: Option<EnrichMode>,
565
566    /// G35: number of seconds before the OAuth rate-limit reset at which
567    /// the preflight probe should refuse to start. Default 300 (5 min).
568    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
569    pub rate_limit_buffer: u64,
570
571    /// G28-D: refuse to start when the 1-minute load average exceeds
572    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
573    /// Set to false to skip the check on contended CI runners.
574    #[arg(long, default_value_t = true)]
575    pub max_load_check: bool,
576
577    /// G28-D: when the system is saturated, abort the job after this
578    /// many consecutive HardFailure outcomes. Default 5.
579    #[arg(long, value_name = "N", default_value_t = 5)]
580    pub circuit_breaker_threshold: u32,
581
582    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
583    /// original body and the LLM-rewritten body for the rewrite to be
584    /// accepted. Scores below the threshold are rejected and emitted as
585    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
586    /// gap specification). Ignored when `--operation` is not
587    /// `body-enrich`.
588    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
589    pub preserve_threshold: f64,
590
591    /// G33 Passo 3: when set, validate `--codex-model` against the
592    /// ChatGPT Pro OAuth accepted-model list and abort with a
593    /// suggestion when the value is unknown. Default true (fail fast
594    /// to avoid burning OAuth turns). Set to false to opt out.
595    #[arg(long, default_value_t = true)]
596    pub codex_model_validate: bool,
597
598    /// G33 Passo 3: when set together with an invalid `--codex-model`,
599    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
600    /// instead of aborting. The substitution is recorded in the NDJSON
601    /// stream as `provider_substituted: true` for traceability.
602    #[arg(long, value_name = "MODEL")]
603    pub codex_model_fallback: Option<String>,
604}
605
606// ---------------------------------------------------------------------------
607// Internal types — raw LLM output structs
608// ---------------------------------------------------------------------------
609
610// ---------------------------------------------------------------------------
611// NDJSON event types emitted to stdout
612// ---------------------------------------------------------------------------
613
614#[derive(Debug, Serialize)]
615struct PhaseEvent<'a> {
616    phase: &'a str,
617    #[serde(skip_serializing_if = "Option::is_none")]
618    binary_path: Option<&'a str>,
619    #[serde(skip_serializing_if = "Option::is_none")]
620    version: Option<&'a str>,
621    #[serde(skip_serializing_if = "Option::is_none")]
622    items_total: Option<usize>,
623    #[serde(skip_serializing_if = "Option::is_none")]
624    items_pending: Option<usize>,
625    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
626    #[serde(skip_serializing_if = "Option::is_none")]
627    llm_parallelism: Option<u32>,
628}
629
630#[derive(Debug, Serialize)]
631struct ItemEvent<'a> {
632    /// Item identifier (memory name or entity name).
633    item: &'a str,
634    status: &'a str,
635    #[serde(skip_serializing_if = "Option::is_none")]
636    memory_id: Option<i64>,
637    #[serde(skip_serializing_if = "Option::is_none")]
638    entity_id: Option<i64>,
639    #[serde(skip_serializing_if = "Option::is_none")]
640    entities: Option<usize>,
641    #[serde(skip_serializing_if = "Option::is_none")]
642    rels: Option<usize>,
643    #[serde(skip_serializing_if = "Option::is_none")]
644    chars_before: Option<usize>,
645    #[serde(skip_serializing_if = "Option::is_none")]
646    chars_after: Option<usize>,
647    #[serde(skip_serializing_if = "Option::is_none")]
648    cost_usd: Option<f64>,
649    #[serde(skip_serializing_if = "Option::is_none")]
650    elapsed_ms: Option<u64>,
651    #[serde(skip_serializing_if = "Option::is_none")]
652    error: Option<String>,
653    index: usize,
654    total: usize,
655}
656
657#[derive(Debug, Serialize)]
658struct EnrichSummary {
659    summary: bool,
660    operation: String,
661    items_total: usize,
662    completed: usize,
663    failed: usize,
664    skipped: usize,
665    cost_usd: f64,
666    elapsed_ms: u64,
667    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
668    /// executou o re-embedding durante o enrich. `"claude" | "codex" | "none"`.
669    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
670    /// ou quando a operação não envolveu re-embed).
671    #[serde(skip_serializing_if = "Option::is_none")]
672    backend_invoked: Option<&'static str>,
673}
674
675use crate::output::emit_json_line as emit_json;
676
677// ---------------------------------------------------------------------------
678// Queue DB
679// ---------------------------------------------------------------------------
680
681/// Opens or creates the enrichment queue database.
682///
683/// The queue schema mirrors `ingest_claude` for resume/retry parity.
684/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
685///
686/// # DRY note
687///
688/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
689/// Both should be unified in a shared `llm_runner.rs` module by the
690/// Integration stream.
691fn open_queue_db(path: &str) -> Result<Connection, AppError> {
692    let conn = Connection::open(path)?;
693    conn.pragma_update(None, "journal_mode", "wal")?;
694    conn.execute_batch(
695        "CREATE TABLE IF NOT EXISTS queue (
696            id          INTEGER PRIMARY KEY AUTOINCREMENT,
697            item_key    TEXT NOT NULL UNIQUE,
698            item_type   TEXT NOT NULL DEFAULT 'memory',
699            status      TEXT NOT NULL DEFAULT 'pending',
700            memory_id   INTEGER,
701            entity_id   INTEGER,
702            entities    INTEGER DEFAULT 0,
703            rels        INTEGER DEFAULT 0,
704            error       TEXT,
705            cost_usd    REAL DEFAULT 0.0,
706            attempt     INTEGER DEFAULT 0,
707            elapsed_ms  INTEGER,
708            created_at  TEXT DEFAULT (datetime('now')),
709            done_at     TEXT
710        );
711        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
712    )?;
713    // GAP-ENRICH-BACKLOG-CONVERGE (v1.0.96): dead-letter columns. The legacy
714    // `.enrich-queue.sqlite` predates these columns and `CREATE TABLE IF NOT
715    // EXISTS` never alters an existing table, so add them idempotently here.
716    let mut has_error_class = false;
717    let mut has_next_retry_at = false;
718    {
719        let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
720        let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
721        for name in names {
722            match name?.as_str() {
723                "error_class" => has_error_class = true,
724                "next_retry_at" => has_next_retry_at = true,
725                _ => {}
726            }
727        }
728    }
729    if !has_error_class {
730        conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
731    }
732    if !has_next_retry_at {
733        conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
734    }
735    conn.execute_batch(
736        "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at)",
737    )?;
738    Ok(conn)
739}
740
741// ---------------------------------------------------------------------------
742// GAP-ENRICH-BACKLOG-CONVERGE — dead-letter classification + queue failure sink
743// ---------------------------------------------------------------------------
744
745/// Read-only `enrich --status` report (no LLM, no singleton).
746#[derive(Debug, Serialize, schemars::JsonSchema)]
747pub struct EnrichStatus {
748    status_report: bool,
749    operation: String,
750    namespace: String,
751    unbound_backlog: usize,
752    queue_pending: i64,
753    queue_processing: i64,
754    queue_done: i64,
755    queue_failed: i64,
756    queue_skipped: i64,
757    queue_dead: i64,
758    eligible_now: i64,
759    waiting: i64,
760}
761
762/// Classifies an enrich item failure into a retry/dead-letter outcome.
763///
764/// Transient errors (rate-limit, timeout, db-busy, or a message that smells
765/// like a recoverable network/5xx hiccup) are rescheduled with backoff.
766/// Everything else — validation, parse, invalid body, unknown — is a permanent
767/// `HardFailure` routed to the dead-letter sink so the backlog can converge.
768fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
769    use crate::retry::AttemptOutcome;
770    match e {
771        AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
772            AttemptOutcome::Transient
773        }
774        _ => {
775            let msg = format!("{e}").to_lowercase();
776            if msg.contains("server error")
777                || msg.contains("timed out")
778                || msg.contains("timeout")
779                || msg.contains("connection")
780                || msg.contains("5xx")
781                || msg.contains("502")
782                || msg.contains("503")
783                || msg.contains("504")
784            {
785                AttemptOutcome::Transient
786            } else {
787                AttemptOutcome::HardFailure
788            }
789        }
790    }
791}
792
793/// Applies a failure outcome to a single queue row. Shared by the parallel
794/// worker and the serial loop (DRY). A `HardFailure`, or a transient failure
795/// whose attempt count reached `max_attempts`, lands in the dead-letter status
796/// (`status='dead'`) so it is never re-selected. A transient failure below the
797/// cap is rescheduled to `pending` with an exponential-backoff `next_retry_at`.
798/// Returns the [`crate::retry::AttemptOutcome`] so the caller can feed the
799/// existing circuit breaker.
800fn record_item_failure(
801    queue_conn: &rusqlite::Connection,
802    queue_id: i64,
803    attempt: i64,
804    max_attempts: u32,
805    err: &AppError,
806) -> crate::retry::AttemptOutcome {
807    use crate::retry::AttemptOutcome;
808    let outcome = classify_enrich_outcome(err);
809    let err_str = format!("{err}");
810    let error_class = match outcome {
811        AttemptOutcome::Transient => "transient",
812        AttemptOutcome::HardFailure => "permanent",
813        AttemptOutcome::Success => "success",
814    };
815
816    let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
817    if terminal {
818        let _ = queue_conn.execute(
819            "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now') WHERE id=?3",
820            rusqlite::params![err_str, error_class, queue_id],
821        );
822    } else {
823        let delay = crate::retry::compute_delay(
824            &crate::retry::RetryConfig::llm_rate_limit(),
825            attempt.max(0) as u32,
826        );
827        let secs = delay.as_secs().max(1);
828        let modifier = format!("+{secs} seconds");
829        let _ = queue_conn.execute(
830            "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3) WHERE id=?4",
831            rusqlite::params![err_str, error_class, modifier, queue_id],
832        );
833    }
834    outcome
835}
836
837// ---------------------------------------------------------------------------
838// LLM invocation — Claude Code
839// ---------------------------------------------------------------------------
840
841/// Calls `claude -p` via the shared `claude_runner` module (G02).
842///
843/// Returns `(output_value, cost_usd, is_oauth)`.
844fn call_claude(
845    binary: &Path,
846    prompt: &str,
847    json_schema: &str,
848    input_text: &str,
849    model: Option<&str>,
850    timeout_secs: u64,
851) -> Result<(serde_json::Value, f64, bool), AppError> {
852    let result = crate::commands::claude_runner::run_claude(
853        binary,
854        prompt,
855        json_schema,
856        input_text,
857        model,
858        timeout_secs,
859        7,
860    )?;
861    Ok((result.value, result.cost_usd, result.is_oauth))
862}
863
864/// v1.0.95 (ADR-0054): route a single JUDGE turn through the OpenRouter
865/// chat-completions REST API. Unlike the subprocess runners there is no
866/// `binary` argument: the process-wide chat client (initialised in `run()`
867/// before scan) is fetched from the singleton and driven synchronously via
868/// the shared tokio runtime. Returns `(value, cost_usd, is_oauth=false)`
869/// where `cost_usd` is read from the response `usage.cost`.
870fn call_openrouter(
871    prompt: &str,
872    json_schema: &str,
873    input_text: &str,
874    model: Option<&str>,
875    timeout_secs: u64,
876) -> Result<(serde_json::Value, f64, bool), AppError> {
877    // `model` is bound into the client singleton at init; `timeout_secs` is
878    // enforced by the reqwest builder. Both remain in the signature for
879    // parity with the subprocess runners.
880    let _ = (model, timeout_secs);
881    let client = crate::embedder::openrouter_chat_client().ok_or_else(|| {
882        AppError::Validation(
883            "OpenRouter chat client not initialised before dispatch (internal error)".into(),
884        )
885    })?;
886    let runtime = crate::embedder::shared_runtime()?;
887    runtime.block_on(client.complete(prompt, input_text, json_schema, None))
888}
889
890// ---------------------------------------------------------------------------
891// Preflight probe (G35) — single-turn ping to verify the LLM provider
892// ---------------------------------------------------------------------------
893
894/// Result of a single preflight ping (G35).
895enum PreflightOutcome {
896    /// The provider accepted the ping without rate-limit or other errors.
897    Healthy,
898    /// The provider rejected the ping due to OAuth rate limit. The
899    /// `suggestion` field is a human hint that callers can embed in the
900    /// user-facing error.
901    RateLimited {
902        reason: String,
903        suggestion: &'static str,
904    },
905    /// Any other provider error (binary missing, auth failure, etc.).
906    Error(AppError),
907}
908
909/// Probes the configured LLM provider with a 1-turn ping.
910///
911/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
912/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
913///
914/// The probe intentionally avoids spawning any MCP server children (G28-A)
915/// to keep its own process footprint at the minimum.
916fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
917    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
918
919    match args.mode {
920        EnrichMode::ClaudeCode => {
921            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
922                Ok(b) => b,
923                Err(e) => return PreflightOutcome::Error(e),
924            };
925            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
926            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
927            // form) and run the preflight gate before spawn, mirroring
928            // the canonical pattern in `claude_runner::build_claude_command`.
929            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
930                Ok(p) => p,
931                Err(e) => {
932                    return PreflightOutcome::Error(AppError::Io(e));
933                }
934            };
935            let mut cmd = std::process::Command::new(&bin);
936            crate::spawn::env_whitelist::apply_env_whitelist(
937                &mut cmd,
938                crate::spawn::env_whitelist::is_strict_env_clear(),
939            );
940            if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
941                return PreflightOutcome::Error(e);
942            }
943            cmd.arg("-p")
944                .arg("ping")
945                .arg("--max-turns")
946                .arg("1")
947                .arg("--strict-mcp-config")
948                .arg("--mcp-config")
949                .arg(mcp_config_path.as_os_str())
950                .arg("--dangerously-skip-permissions")
951                .arg("--settings")
952                .arg("{\"hooks\":{}}")
953                .arg("--output-format")
954                .arg("json")
955                .stdin(std::process::Stdio::null())
956                .stdout(std::process::Stdio::piped())
957                .stderr(std::process::Stdio::piped());
958
959            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
960                Ok(c) => c,
961                Err(e) => {
962                    return PreflightOutcome::Error(AppError::Io(e));
963                }
964            };
965            let output = match wait_with_timeout(child, timeout) {
966                Ok(out) => out,
967                Err(e) => return PreflightOutcome::Error(e),
968            };
969            if !output.status.success() {
970                let stderr = String::from_utf8_lossy(&output.stderr);
971                if stderr.contains("hit your session limit")
972                    || stderr.contains("rate_limit")
973                    || stderr.contains("429")
974                {
975                    return PreflightOutcome::RateLimited {
976                        reason: stderr.trim().to_string(),
977                        suggestion:
978                            "wait for the OAuth window to reset or use --fallback-mode codex",
979                    };
980                }
981                return PreflightOutcome::Error(AppError::Validation(format!(
982                    "preflight probe failed: {stderr}",
983                    stderr = stderr.trim()
984                )));
985            }
986            PreflightOutcome::Healthy
987        }
988        EnrichMode::Codex => {
989            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
990                Ok(b) => b,
991                Err(e) => return PreflightOutcome::Error(e),
992            };
993            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
994                .map_err(PreflightOutcome::Error)
995                .ok();
996            let schema = "{}";
997            let schema_path = match super::codex_spawn::trusted_schema_path() {
998                Ok(p) => p,
999                Err(e) => return PreflightOutcome::Error(e),
1000            };
1001            let spawn_args = super::codex_spawn::CodexSpawnArgs {
1002                binary: &bin,
1003                prompt: "ping",
1004                json_schema: schema,
1005                input_text: "",
1006                model: args.codex_model.as_deref(),
1007                timeout_secs: args.rate_limit_buffer.max(60),
1008                schema_path: schema_path.clone(),
1009            };
1010            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
1011                Ok(c) => c,
1012                Err(e) => return PreflightOutcome::Error(e),
1013            };
1014            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
1015                Ok(c) => c,
1016                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1017            };
1018            let output = match wait_with_timeout(child, timeout) {
1019                Ok(out) => out,
1020                Err(e) => return PreflightOutcome::Error(e),
1021            };
1022            let _ = std::fs::remove_file(&schema_path);
1023            if !output.status.success() {
1024                let stderr = String::from_utf8_lossy(&output.stderr);
1025                if stderr.contains("rate_limit")
1026                    || stderr.contains("429")
1027                    || stderr.contains("Too Many Requests")
1028                {
1029                    return PreflightOutcome::RateLimited {
1030                        reason: stderr.trim().to_string(),
1031                        suggestion: "wait for the rate-limit window to reset",
1032                    };
1033                }
1034                return PreflightOutcome::Error(AppError::Validation(format!(
1035                    "preflight probe failed: {stderr}",
1036                    stderr = stderr.trim()
1037                )));
1038            }
1039            PreflightOutcome::Healthy
1040        }
1041        EnrichMode::Opencode => {
1042            let bin = match super::opencode_runner::find_opencode_binary_with_override(
1043                args.opencode_binary.as_deref(),
1044            ) {
1045                Ok(b) => b,
1046                Err(e) => return PreflightOutcome::Error(e),
1047            };
1048            let model =
1049                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1050            let mut cmd =
1051                match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1052                {
1053                    Ok(c) => c,
1054                    Err(e) => return PreflightOutcome::Error(e),
1055                };
1056            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1057                Ok(c) => c,
1058                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1059            };
1060            let output = match wait_with_timeout(child, timeout) {
1061                Ok(out) => out,
1062                Err(e) => return PreflightOutcome::Error(e),
1063            };
1064            if !output.status.success() {
1065                let stderr = String::from_utf8_lossy(&output.stderr);
1066                if stderr.contains("rate_limit")
1067                    || stderr.contains("429")
1068                    || stderr.contains("Too Many Requests")
1069                {
1070                    return PreflightOutcome::RateLimited {
1071                        reason: stderr.trim().to_string(),
1072                        suggestion: "wait for the rate-limit window to reset",
1073                    };
1074                }
1075                return PreflightOutcome::Error(AppError::Validation(format!(
1076                    "preflight probe failed: {stderr}",
1077                    stderr = stderr.trim()
1078                )));
1079            }
1080            PreflightOutcome::Healthy
1081        }
1082        EnrichMode::OpenRouter => {
1083            // v1.0.95: the OpenRouter JUDGE has no subprocess to ping; the
1084            // preflight only confirms a usable API key resolves. The chat
1085            // client singleton is initialised in run() before scan.
1086            match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1087                Some(_) => PreflightOutcome::Healthy,
1088                None => PreflightOutcome::Error(AppError::Validation(
1089                    "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1090                )),
1091            }
1092        }
1093    }
1094}
1095
1096/// Cross-platform wait with timeout (no extra crate dependency).
1097fn wait_with_timeout(
1098    mut child: std::process::Child,
1099    timeout: std::time::Duration,
1100) -> Result<std::process::Output, AppError> {
1101    use wait_timeout::ChildExt;
1102    let start = std::time::Instant::now();
1103    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
1104    if status.is_none() {
1105        let _ = child.kill();
1106        let _ = child.wait();
1107        return Err(AppError::Validation(format!(
1108            "preflight probe timed out after {}s",
1109            start.elapsed().as_secs()
1110        )));
1111    }
1112    let mut stdout = Vec::new();
1113    if let Some(mut out) = child.stdout.take() {
1114        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1115    }
1116    let mut stderr = Vec::new();
1117    if let Some(mut err) = child.stderr.take() {
1118        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1119    }
1120    let exit = status.unwrap();
1121    Ok(std::process::Output {
1122        status: exit,
1123        stdout,
1124        stderr,
1125    })
1126}
1127
1128// ---------------------------------------------------------------------------
1129// SCAN helpers — SQL queries that find items needing enrichment
1130// ---------------------------------------------------------------------------
1131
1132/// Returns memories without any `memory_entities` binding.
1133///
1134/// These are the targets for `memory-bindings` enrichment. When `name_filter`
1135/// is non-empty, restricts the scan to the given names (G37); unknown names
1136/// are silently skipped (the caller can detect them by comparing
1137/// requested vs. returned).
1138fn scan_unbound_memories(
1139    conn: &Connection,
1140    namespace: &str,
1141    limit: Option<usize>,
1142    name_filter: &[String],
1143) -> Result<Vec<(i64, String, String)>, AppError> {
1144    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1145
1146    if name_filter.is_empty() {
1147        let sql = format!(
1148            "SELECT m.id, m.name, m.body
1149             FROM memories m
1150             WHERE m.namespace = ?1
1151               AND m.deleted_at IS NULL
1152               AND NOT EXISTS (
1153                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1154               )
1155             ORDER BY m.id
1156             {limit_clause}"
1157        );
1158        let mut stmt = conn.prepare(&sql)?;
1159        let rows = stmt
1160            .query_map(rusqlite::params![namespace], |r| {
1161                Ok((
1162                    r.get::<_, i64>(0)?,
1163                    r.get::<_, String>(1)?,
1164                    r.get::<_, String>(2)?,
1165                ))
1166            })?
1167            .collect::<Result<Vec<_>, _>>()?;
1168        Ok(rows)
1169    } else {
1170        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
1171        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1172            .map(|i| format!("?{i}"))
1173            .collect();
1174        let in_clause = placeholders.join(", ");
1175        let sql = format!(
1176            "SELECT m.id, m.name, m.body
1177             FROM memories m
1178             WHERE m.namespace = ?1
1179               AND m.deleted_at IS NULL
1180               AND m.name IN ({in_clause})
1181               AND NOT EXISTS (
1182                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1183               )
1184             ORDER BY m.id
1185             {limit_clause}"
1186        );
1187        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1188        params_vec.push(&namespace);
1189        for n in name_filter {
1190            params_vec.push(n);
1191        }
1192        let mut stmt = conn.prepare(&sql)?;
1193        let rows = stmt
1194            .query_map(
1195                rusqlite::params_from_iter(params_vec.iter().copied()),
1196                |r| {
1197                    Ok((
1198                        r.get::<_, i64>(0)?,
1199                        r.get::<_, String>(1)?,
1200                        r.get::<_, String>(2)?,
1201                    ))
1202                },
1203            )?
1204            .collect::<Result<Vec<_>, _>>()?;
1205        Ok(rows)
1206    }
1207}
1208
1209/// Reads a list of memory names from a UTF-8 text file (G37).
1210///
1211/// Empty lines and lines beginning with `#` are skipped. Returns a
1212/// de-duplicated, order-preserving list of trimmed names.
1213fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1214    let content = std::fs::read_to_string(path).map_err(|e| {
1215        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1216    })?;
1217    let mut seen = std::collections::HashSet::new();
1218    let mut out = Vec::new();
1219    for line in content.lines() {
1220        let trimmed = line.trim();
1221        if trimmed.is_empty() || trimmed.starts_with('#') {
1222            continue;
1223        }
1224        if seen.insert(trimmed.to_string()) {
1225            out.push(trimmed.to_string());
1226        }
1227    }
1228    Ok(out)
1229}
1230
1231/// Resolves the union of `--names` and `--names-file` (G37).
1232fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1233    let mut combined: Vec<String> = args.names.clone();
1234    if let Some(p) = &args.names_file {
1235        let from_file = read_names_file(p)?;
1236        for n in from_file {
1237            if !combined.contains(&n) {
1238                combined.push(n);
1239            }
1240        }
1241    }
1242    Ok(combined)
1243}
1244
1245/// Returns entities with NULL or empty description.
1246///
1247/// These are the targets for `entity-descriptions` enrichment.
1248fn scan_entities_without_description(
1249    conn: &Connection,
1250    namespace: &str,
1251    limit: Option<usize>,
1252    name_filter: &[String],
1253) -> Result<Vec<(i64, String, String)>, AppError> {
1254    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1255
1256    if name_filter.is_empty() {
1257        let sql = format!(
1258            "SELECT id, name, type
1259             FROM entities
1260             WHERE namespace = ?1
1261               AND (description IS NULL OR description = '')
1262             ORDER BY id
1263             {limit_clause}"
1264        );
1265        let mut stmt = conn.prepare(&sql)?;
1266        let rows = stmt
1267            .query_map(rusqlite::params![namespace], |r| {
1268                Ok((
1269                    r.get::<_, i64>(0)?,
1270                    r.get::<_, String>(1)?,
1271                    r.get::<_, String>(2)?,
1272                ))
1273            })?
1274            .collect::<Result<Vec<_>, _>>()?;
1275        Ok(rows)
1276    } else {
1277        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1278            .map(|i| format!("?{i}"))
1279            .collect();
1280        let in_clause = placeholders.join(", ");
1281        let sql = format!(
1282            "SELECT id, name, type
1283             FROM entities
1284             WHERE namespace = ?1
1285               AND name IN ({in_clause})
1286               AND (description IS NULL OR description = '')
1287             ORDER BY id
1288             {limit_clause}"
1289        );
1290        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1291        params_vec.push(&namespace);
1292        for n in name_filter {
1293            params_vec.push(n);
1294        }
1295        let mut stmt = conn.prepare(&sql)?;
1296        let rows = stmt
1297            .query_map(
1298                rusqlite::params_from_iter(params_vec.iter().copied()),
1299                |r| {
1300                    Ok((
1301                        r.get::<_, i64>(0)?,
1302                        r.get::<_, String>(1)?,
1303                        r.get::<_, String>(2)?,
1304                    ))
1305                },
1306            )?
1307            .collect::<Result<Vec<_>, _>>()?;
1308        Ok(rows)
1309    }
1310}
1311
1312/// Returns memories whose body length is below the configured minimum.
1313///
1314/// These are the targets for `body-enrich` (GAP-18).
1315fn scan_short_body_memories(
1316    conn: &Connection,
1317    namespace: &str,
1318    min_chars: usize,
1319    limit: Option<usize>,
1320    name_filter: &[String],
1321) -> Result<Vec<(i64, String, String)>, AppError> {
1322    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1323
1324    if name_filter.is_empty() {
1325        let sql = format!(
1326            "SELECT m.id, m.name, m.body
1327             FROM memories m
1328             WHERE m.namespace = ?1
1329               AND m.deleted_at IS NULL
1330               AND LENGTH(COALESCE(m.body,'')) < ?2
1331             ORDER BY m.id
1332             {limit_clause}"
1333        );
1334        let mut stmt = conn.prepare(&sql)?;
1335        let rows = stmt
1336            .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1337                Ok((
1338                    r.get::<_, i64>(0)?,
1339                    r.get::<_, String>(1)?,
1340                    r.get::<_, String>(2)?,
1341                ))
1342            })?
1343            .collect::<Result<Vec<_>, _>>()?;
1344        Ok(rows)
1345    } else {
1346        let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1347            .map(|i| format!("?{i}"))
1348            .collect();
1349        let in_clause = placeholders.join(", ");
1350        let sql = format!(
1351            "SELECT m.id, m.name, m.body
1352             FROM memories m
1353             WHERE m.namespace = ?1
1354               AND m.deleted_at IS NULL
1355               AND m.name IN ({in_clause})
1356               AND LENGTH(COALESCE(m.body,'')) < ?2
1357             ORDER BY m.id
1358             {limit_clause}"
1359        );
1360        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1361        let min_chars_i64 = min_chars as i64;
1362        params_vec.push(&namespace);
1363        params_vec.push(&min_chars_i64);
1364        for n in name_filter {
1365            params_vec.push(n);
1366        }
1367        let mut stmt = conn.prepare(&sql)?;
1368        let rows = stmt
1369            .query_map(
1370                rusqlite::params_from_iter(params_vec.iter().copied()),
1371                |r| {
1372                    Ok((
1373                        r.get::<_, i64>(0)?,
1374                        r.get::<_, String>(1)?,
1375                        r.get::<_, String>(2)?,
1376                    ))
1377                },
1378            )?
1379            .collect::<Result<Vec<_>, _>>()?;
1380        Ok(rows)
1381    }
1382}
1383
1384/// Returns live memories that still have no row in `memory_embeddings`.
1385///
1386/// These are the targets for `re-embed`.
1387fn scan_memories_without_embeddings(
1388    conn: &Connection,
1389    namespace: &str,
1390    limit: Option<usize>,
1391    name_filter: &[String],
1392) -> Result<Vec<(i64, String, String)>, AppError> {
1393    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1394
1395    if name_filter.is_empty() {
1396        let sql = format!(
1397            "SELECT m.id, m.name, COALESCE(m.body,'')
1398             FROM memories m
1399             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1400             WHERE m.namespace = ?1
1401               AND m.deleted_at IS NULL
1402               AND me.memory_id IS NULL
1403             ORDER BY m.id
1404             {limit_clause}"
1405        );
1406        let mut stmt = conn.prepare(&sql)?;
1407        let rows = stmt
1408            .query_map(rusqlite::params![namespace], |r| {
1409                Ok((
1410                    r.get::<_, i64>(0)?,
1411                    r.get::<_, String>(1)?,
1412                    r.get::<_, String>(2)?,
1413                ))
1414            })?
1415            .collect::<Result<Vec<_>, _>>()?;
1416        Ok(rows)
1417    } else {
1418        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1419            .map(|i| format!("?{i}"))
1420            .collect();
1421        let in_clause = placeholders.join(", ");
1422        let sql = format!(
1423            "SELECT m.id, m.name, COALESCE(m.body,'')
1424             FROM memories m
1425             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1426             WHERE m.namespace = ?1
1427               AND m.deleted_at IS NULL
1428               AND m.name IN ({in_clause})
1429               AND me.memory_id IS NULL
1430             ORDER BY m.id
1431             {limit_clause}"
1432        );
1433        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1434        params_vec.push(&namespace);
1435        for n in name_filter {
1436            params_vec.push(n);
1437        }
1438        let mut stmt = conn.prepare(&sql)?;
1439        let rows = stmt
1440            .query_map(
1441                rusqlite::params_from_iter(params_vec.iter().copied()),
1442                |r| {
1443                    Ok((
1444                        r.get::<_, i64>(0)?,
1445                        r.get::<_, String>(1)?,
1446                        r.get::<_, String>(2)?,
1447                    ))
1448                },
1449            )?
1450            .collect::<Result<Vec<_>, _>>()?;
1451        Ok(rows)
1452    }
1453}
1454
1455/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1456#[allow(clippy::type_complexity)]
1457fn scan_weight_candidates(
1458    conn: &Connection,
1459    namespace: &str,
1460    limit: Option<usize>,
1461) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1462    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1463    let sql = format!(
1464        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1465         FROM relationships r \
1466         JOIN entities e1 ON e1.id = r.source_id \
1467         JOIN entities e2 ON e2.id = r.target_id \
1468         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1469         ORDER BY r.weight DESC {limit_clause}"
1470    );
1471    let mut stmt = conn.prepare(&sql)?;
1472    let rows = stmt
1473        .query_map(rusqlite::params![namespace], |r| {
1474            Ok((
1475                r.get::<_, i64>(0)?,
1476                r.get::<_, String>(1)?,
1477                r.get::<_, String>(2)?,
1478                r.get::<_, String>(3)?,
1479                r.get::<_, f64>(4)?,
1480            ))
1481        })?
1482        .collect::<Result<Vec<_>, _>>()?;
1483    Ok(rows)
1484}
1485
1486/// G27: Returns relationships with generic relation types (applies_to).
1487fn scan_generic_relations(
1488    conn: &Connection,
1489    namespace: &str,
1490    limit: Option<usize>,
1491) -> Result<Vec<(i64, String, String, String)>, AppError> {
1492    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1493    let sql = format!(
1494        "SELECT r.id, e1.name, e2.name, r.relation \
1495         FROM relationships r \
1496         JOIN entities e1 ON e1.id = r.source_id \
1497         JOIN entities e2 ON e2.id = r.target_id \
1498         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1499         ORDER BY r.id {limit_clause}"
1500    );
1501    let mut stmt = conn.prepare(&sql)?;
1502    let rows = stmt
1503        .query_map(rusqlite::params![namespace], |r| {
1504            Ok((
1505                r.get::<_, i64>(0)?,
1506                r.get::<_, String>(1)?,
1507                r.get::<_, String>(2)?,
1508                r.get::<_, String>(3)?,
1509            ))
1510        })?
1511        .collect::<Result<Vec<_>, _>>()?;
1512    Ok(rows)
1513}
1514
1515// ---------------------------------------------------------------------------
1516// PERSIST helpers for fully-implemented operations
1517// ---------------------------------------------------------------------------
1518
1519/// Persists entity bindings extracted by the LLM for a memory.
1520///
1521/// Creates entities via `upsert_entity`, links them to the memory via
1522/// `link_memory_entity`, and upserts relationships found between entities.
1523fn persist_memory_bindings(
1524    conn: &Connection,
1525    namespace: &str,
1526    memory_id: i64,
1527    entities_json: &serde_json::Value,
1528    rels_json: &serde_json::Value,
1529) -> Result<(usize, usize), AppError> {
1530    #[derive(Deserialize)]
1531    struct EntityItem {
1532        name: String,
1533        entity_type: String,
1534    }
1535    #[derive(Deserialize)]
1536    struct RelItem {
1537        source: String,
1538        target: String,
1539        relation: String,
1540        strength: f64,
1541    }
1542
1543    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1544        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1545
1546    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1547        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1548
1549    let mut ent_count = 0usize;
1550    let mut rel_count = 0usize;
1551
1552    for item in &extracted_entities {
1553        let entity_type = match item.entity_type.parse::<EntityType>() {
1554            Ok(et) => et,
1555            Err(_) => {
1556                tracing::warn!(
1557                    target: "enrich",
1558                    entity = %item.name,
1559                    entity_type = %item.entity_type,
1560                    "entity type not recognized, skipping"
1561                );
1562                continue;
1563            }
1564        };
1565        match entities::upsert_entity(
1566            conn,
1567            namespace,
1568            &NewEntity {
1569                name: item.name.clone(),
1570                entity_type,
1571                description: None,
1572            },
1573        ) {
1574            Ok(eid) => {
1575                let _ = entities::link_memory_entity(conn, memory_id, eid);
1576                ent_count += 1;
1577            }
1578            Err(e) => {
1579                tracing::warn!(
1580                    target: "enrich",
1581                    entity = %item.name,
1582                    error = %e,
1583                    "entity upsert skipped"
1584                );
1585            }
1586        }
1587    }
1588
1589    for rel in &extracted_rels {
1590        let normalized = crate::parsers::normalize_relation(&rel.relation);
1591        crate::parsers::warn_if_non_canonical(&normalized);
1592
1593        // Normalize entity names before lookup: upsert_entity normalizes on write,
1594        // so the lookup must use the same normalized form to find the row.
1595        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1596        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1597        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1598        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1599        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1600            let new_rel = NewRelationship {
1601                source: rel.source.clone(),
1602                target: rel.target.clone(),
1603                relation: normalized,
1604                strength: rel.strength,
1605                description: None,
1606            };
1607            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1608                rel_count += 1;
1609            }
1610        }
1611    }
1612
1613    Ok((ent_count, rel_count))
1614}
1615
1616/// Updates an entity's description directly in the `entities` table.
1617fn persist_entity_description(
1618    conn: &Connection,
1619    entity_id: i64,
1620    description: &str,
1621) -> Result<(), AppError> {
1622    conn.execute(
1623        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1624        rusqlite::params![description, entity_id],
1625    )?;
1626    Ok(())
1627}
1628
1629/// v1.0.84 (ADR-0042): on successful re-embed, records the active backend
1630/// into the shared accumulator (`ENRICH_LAST_BACKEND`) so the final
1631/// `EnrichSummary` can expose `backend_invoked` without changing every
1632/// caller's signature. Best-effort observability — concurrent enrich runs
1633/// may race, but `Mutex` keeps the mutation safe.
1634#[allow(clippy::too_many_arguments)]
1635fn reembed_memory_vector(
1636    conn: &Connection,
1637    namespace: &str,
1638    memory_id: i64,
1639    memory_name: &str,
1640    memory_type: &str,
1641    body: &str,
1642    paths: &crate::paths::AppPaths,
1643    llm_backend: crate::cli::LlmBackendChoice,
1644    embedding_backend: crate::cli::EmbeddingBackendChoice,
1645) -> Result<(), AppError> {
1646    let snippet: String = body.chars().take(200).collect();
1647    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
1648    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
1649    // backend que efetivamente rodou e popula o accumulator para o
1650    // EnrichSummary agregado.
1651    // v1.0.93 (GAP-OR-PROPAGATION): honour --embedding-backend openrouter.
1652    let (embedding, backend_kind) = crate::embedder::embed_passage_with_embedding_choice(
1653        &paths.models,
1654        body,
1655        embedding_backend,
1656        llm_backend,
1657    )?;
1658    record_enrich_backend(backend_kind.as_str());
1659    memories::upsert_vec(
1660        conn,
1661        memory_id,
1662        namespace,
1663        memory_type,
1664        &embedding,
1665        memory_name,
1666        &snippet,
1667    )?;
1668    Ok(())
1669}
1670
1671/// v1.0.84 (ADR-0042): process-local accumulator of the last LLM backend
1672/// that successfully ran a re-embed during the current enrich invocation.
1673/// Read by `run` once at summary emission. Scoped to a single process —
1674/// cross-process enrichment is gated by the per-namespace singleton, so
1675/// there is no concurrency hazard across DBs.
1676fn record_enrich_backend(backend: &'static str) {
1677    if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1678        *guard = Some(backend);
1679    }
1680}
1681
1682fn take_enrich_backend() -> Option<&'static str> {
1683    ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1684}
1685
1686static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1687
1688/// Persists an enriched memory body (body-enrich, GAP-18).
1689///
1690/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1691/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1692#[allow(clippy::too_many_arguments)]
1693fn persist_enriched_body(
1694    conn: &Connection,
1695    namespace: &str,
1696    memory_id: i64,
1697    memory_name: &str,
1698    new_body: &str,
1699    paths: &crate::paths::AppPaths,
1700    llm_backend: crate::cli::LlmBackendChoice,
1701    embedding_backend: crate::cli::EmbeddingBackendChoice,
1702) -> Result<(), AppError> {
1703    // Read current values for FTS sync
1704    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1705        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1706        rusqlite::params![memory_id],
1707        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1708    )?;
1709
1710    let memory_type: String = conn.query_row(
1711        "SELECT type FROM memories WHERE id=?1",
1712        rusqlite::params![memory_id],
1713        |r| r.get(0),
1714    )?;
1715
1716    let description: String = conn.query_row(
1717        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1718        rusqlite::params![memory_id],
1719        |r| r.get(0),
1720    )?;
1721
1722    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1723
1724    let new_memory = memories::NewMemory {
1725        namespace: namespace.to_string(),
1726        name: memory_name.to_string(),
1727        memory_type: memory_type.clone(),
1728        description: description.clone(),
1729        body: new_body.to_string(),
1730        body_hash,
1731        session_id: None,
1732        source: "agent".to_string(),
1733        metadata: serde_json::json!({
1734            "operation": "body-enrich",
1735            "orig_chars": old_body.chars().count(),
1736            "new_chars": new_body.chars().count(),
1737        }),
1738    };
1739
1740    // G29 audit: insert a new immutable version BEFORE the update so the
1741    // enriched body is reachable through `history --name <X>` and
1742    // `restore --version N` can roll back to the pre-enrich state.
1743    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1744    let version_metadata = serde_json::json!({
1745        "operation": "body-enrich",
1746        "orig_chars": old_body.chars().count(),
1747        "new_chars": new_body.chars().count(),
1748    })
1749    .to_string();
1750    crate::storage::versions::insert_version(
1751        conn,
1752        memory_id,
1753        next_version,
1754        memory_name,
1755        &memory_type,
1756        &description,
1757        new_body,
1758        &version_metadata,
1759        Some("enrich"),
1760        "edit",
1761    )?;
1762
1763    memories::update(conn, memory_id, &new_memory, None)?;
1764    memories::sync_fts_after_update(
1765        conn,
1766        memory_id,
1767        &old_name,
1768        &old_desc,
1769        &old_body,
1770        &new_memory.name,
1771        &new_memory.description,
1772        &new_memory.body,
1773    )?;
1774
1775    // Re-embed for recall accuracy
1776    if let Err(e) = reembed_memory_vector(
1777        conn,
1778        namespace,
1779        memory_id,
1780        memory_name,
1781        &memory_type,
1782        new_body,
1783        paths,
1784        llm_backend,
1785        embedding_backend,
1786    ) {
1787        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1788    }
1789
1790    Ok(())
1791}
1792
1793// ---------------------------------------------------------------------------
1794// Main entry point
1795// ---------------------------------------------------------------------------
1796
1797// ---------------------------------------------------------------------------
1798// G20: mode-conditional flag validation
1799// ---------------------------------------------------------------------------
1800
1801/// True when a scalar value matches its declared default. Used to
1802/// distinguish "operator passed an explicit override" from "clap filled
1803/// the default" for flags with default_value_t.
1804fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1805    value == default
1806}
1807
1808/// G20: validate that flags for one LLM provider were not passed when
1809/// the operator selected a different provider. Flags silently discarded
1810/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1811/// DB work, so the operator gets an actionable error instead of a
1812/// surprise at runtime.
1813///
1814/// Detection rules:
1815/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1816/// - For scalar fields with default_value_t: value != default means explicit
1817/// - For boolean fields: true means explicit (default is false)
1818///
1819/// Mode-specific matrices:
1820/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1821/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1822fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1823    const DEFAULT_TIMEOUT: u64 = 300;
1824
1825    let mut conflicts: Vec<String> = Vec::new();
1826
1827    match args.mode {
1828        EnrichMode::ClaudeCode => {
1829            if args.codex_binary.is_some() {
1830                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1831            }
1832            if args.codex_model.is_some() {
1833                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1834            }
1835            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1836                conflicts.push(format!(
1837                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1838                    args.codex_timeout
1839                ));
1840            }
1841        }
1842        EnrichMode::Codex => {
1843            if args.claude_binary.is_some() {
1844                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1845            }
1846            if args.claude_model.is_some() {
1847                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1848            }
1849            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1850                conflicts.push(format!(
1851                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1852                    args.claude_timeout
1853                ));
1854            }
1855            if args.max_cost_usd.is_some() {
1856                conflicts.push(
1857                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1858                        .to_string(),
1859                );
1860            }
1861        }
1862        EnrichMode::Opencode => {
1863            if args.claude_binary.is_some() {
1864                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1865            }
1866            if args.claude_model.is_some() {
1867                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1868            }
1869            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1870                conflicts.push(format!(
1871                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1872                    args.claude_timeout
1873                ));
1874            }
1875            if args.max_cost_usd.is_some() {
1876                conflicts.push(
1877                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1878                        .to_string(),
1879                );
1880            }
1881        }
1882        EnrichMode::OpenRouter => {
1883            if args.claude_binary.is_some() {
1884                conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1885            }
1886            if args.claude_model.is_some() {
1887                conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1888            }
1889            if args.codex_binary.is_some() {
1890                conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1891            }
1892            if args.codex_model.is_some() {
1893                conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1894            }
1895            if args.opencode_binary.is_some() {
1896                conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1897            }
1898            if args.opencode_model.is_some() {
1899                conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1900            }
1901            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1902                conflicts.push(format!(
1903                    "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1904                    args.claude_timeout
1905                ));
1906            }
1907            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1908                conflicts.push(format!(
1909                    "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1910                    args.codex_timeout
1911                ));
1912            }
1913            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1914                conflicts.push(format!(
1915                    "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1916                    args.opencode_timeout
1917                ));
1918            }
1919        }
1920    }
1921
1922    if !conflicts.is_empty() {
1923        return Err(AppError::Validation(format!(
1924            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1925            args.mode,
1926            conflicts.join("\n  - ")
1927        )));
1928    }
1929
1930    Ok(())
1931}
1932
1933// ---------------------------------------------------------------------------
1934
1935/// Main entry point for the `enrich` command.
1936pub fn run(
1937    args: &EnrichArgs,
1938    llm_backend: crate::cli::LlmBackendChoice,
1939    embedding_backend: crate::cli::EmbeddingBackendChoice,
1940) -> Result<(), AppError> {
1941    // G20: mode-conditional flag validation BEFORE any DB access.
1942    // Surfaces flags that the wrong mode would silently discard.
1943    validate_mode_conditional_flags_enrich(args)?;
1944
1945    // GAP-ENRICH-BACKLOG-CONVERGE: --status is a read-only report. It never
1946    // calls the LLM, never initialises the OpenRouter client, and never
1947    // acquires the job singleton, so it is safe to run while a real enrich is
1948    // in flight (it only reads the queue DB and the unbound backlog).
1949    if args.status {
1950        let paths = AppPaths::resolve(args.db.as_deref())?;
1951        ensure_db_ready(&paths)?;
1952        let conn = open_rw(&paths.db)?;
1953        let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1954        let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1955        let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1956        let count_status = |st: &str| -> i64 {
1957            queue_conn
1958                .query_row(
1959                    "SELECT COUNT(*) FROM queue WHERE status=?1",
1960                    rusqlite::params![st],
1961                    |r| r.get(0),
1962                )
1963                .unwrap_or(0)
1964        };
1965        let eligible_now: i64 = queue_conn
1966            .query_row(
1967                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1968                 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1969                [],
1970                |r| r.get(0),
1971            )
1972            .unwrap_or(0);
1973        let waiting: i64 = queue_conn
1974            .query_row(
1975                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1976                 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1977                [],
1978                |r| r.get(0),
1979            )
1980            .unwrap_or(0);
1981        emit_json(&EnrichStatus {
1982            status_report: true,
1983            operation: format!("{:?}", args.operation),
1984            namespace: namespace.clone(),
1985            unbound_backlog,
1986            queue_pending: count_status("pending"),
1987            queue_processing: count_status("processing"),
1988            queue_done: count_status("done"),
1989            queue_failed: count_status("failed"),
1990            queue_skipped: count_status("skipped"),
1991            queue_dead: count_status("dead"),
1992            eligible_now,
1993            waiting,
1994        });
1995        return Ok(());
1996    }
1997
1998    // v1.0.95 (ADR-0054): when the JUDGE is OpenRouter the model is mandatory
1999    // (no default) and the API key must resolve BEFORE any network or DB work.
2000    // The chat client singleton is initialised here so every per-item dispatch
2001    // fetches it without re-threading the key.
2002    if args.mode == EnrichMode::OpenRouter {
2003        let model = args.openrouter_model.as_deref().ok_or_else(|| {
2004            AppError::Validation(
2005                "--mode openrouter requires --openrouter-model (no default model is allowed)"
2006                    .into(),
2007            )
2008        })?;
2009        let resolved =
2010            crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
2011                .ok_or_else(|| {
2012                    AppError::Validation(
2013                        "OPENROUTER_API_KEY not found; set the env var, store it via \
2014                         `config add-key --provider openrouter`, or pass --openrouter-api-key"
2015                            .into(),
2016                    )
2017                })?;
2018        crate::embedder::get_openrouter_chat_client(
2019            resolved.value,
2020            model,
2021            args.openrouter_timeout,
2022        )?;
2023    }
2024
2025    let started = Instant::now();
2026
2027    let paths = AppPaths::resolve(args.db.as_deref())?;
2028    ensure_db_ready(&paths)?;
2029    let conn = open_rw(&paths.db)?;
2030    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
2031
2032    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
2033    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
2034    // on the same DB cannot co-exist, but concurrent enrich on different
2035    // databases works as expected. The force flag (--force) breaks a
2036    // stale lock from a previously crashed invocation.
2037    let wait_secs = args.wait_job_singleton;
2038    let force_flag = args.force_job_singleton;
2039    let _singleton = crate::lock::acquire_job_singleton(
2040        crate::lock::JobType::Enrich,
2041        &namespace,
2042        &paths.db,
2043        wait_secs,
2044        force_flag,
2045    )?;
2046
2047    // Validate provider binary upfront only for LLM-backed operations.
2048    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
2049        None
2050    } else {
2051        Some(match args.mode {
2052            EnrichMode::ClaudeCode => {
2053                let bin = find_claude_binary(args.claude_binary.as_deref())?;
2054                let version = super::claude_runner::validate_claude_version(&bin)?;
2055                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
2056                emit_json(&PhaseEvent {
2057                    phase: "validate",
2058                    binary_path: bin.to_str(),
2059                    version: Some(&version),
2060                    items_total: None,
2061                    items_pending: None,
2062                    llm_parallelism: None,
2063                });
2064                bin
2065            }
2066            EnrichMode::Codex => {
2067                let bin = find_codex_binary(args.codex_binary.as_deref())?;
2068                emit_json(&PhaseEvent {
2069                    phase: "validate",
2070                    binary_path: bin.to_str(),
2071                    version: None,
2072                    items_total: None,
2073                    items_pending: None,
2074                    llm_parallelism: None,
2075                });
2076                bin
2077            }
2078            EnrichMode::Opencode => {
2079                let bin = super::opencode_runner::find_opencode_binary_with_override(
2080                    args.opencode_binary.as_deref(),
2081                )?;
2082                emit_json(&PhaseEvent {
2083                    phase: "validate",
2084                    binary_path: bin.to_str(),
2085                    version: None,
2086                    items_total: None,
2087                    items_pending: None,
2088                    llm_parallelism: None,
2089                });
2090                bin
2091            }
2092            EnrichMode::OpenRouter => {
2093                // v1.0.95: the OpenRouter JUDGE is a REST call, not a spawned
2094                // binary. The chat client singleton was initialised at the top
2095                // of run(); this placeholder path threads through the dispatch
2096                // but is never dereferenced by the OpenRouter arm.
2097                emit_json(&PhaseEvent {
2098                    phase: "validate",
2099                    binary_path: None,
2100                    version: None,
2101                    items_total: None,
2102                    items_pending: None,
2103                    llm_parallelism: None,
2104                });
2105                PathBuf::new()
2106            }
2107        })
2108    };
2109
2110    // G28-D: refuse to start when the system is saturated. This check
2111    // is BEFORE preflight so we never spend an OAuth turn on a host
2112    // that is already at the limit.
2113    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
2114        let load = crate::system_load::load_average_one();
2115        let n = crate::system_load::ncpus();
2116        return Err(AppError::Validation(format!(
2117            "system load average {load:.2} exceeds 2x ncpus ({n}); \
2118             pass --no-max-load-check to override (not recommended)"
2119        )));
2120    }
2121
2122    // G35: preflight probe — issue a single ping turn to verify the
2123    // provider is healthy before scanning N candidates. If the probe
2124    // fails with a rate-limit error, optionally fall back to a
2125    // different mode (typically codex) instead of failing the entire
2126    // batch. The probe itself consumes 1 OAuth turn, so it stays
2127    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
2128    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
2129    {
2130        let preflight_result = run_preflight_probe(args);
2131        match preflight_result {
2132            PreflightOutcome::Healthy => {
2133                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
2134            }
2135            PreflightOutcome::RateLimited { reason, suggestion } => {
2136                if let Some(fallback) = args.fallback_mode.clone() {
2137                    if fallback != args.mode {
2138                        // G35 (v1.0.69): the mid-batch mode switch is
2139                        // intentionally NOT applied because it would
2140                        // desynchronise the per-item rate-limit wait
2141                        // state (rate-limited items in the worker are
2142                        // timed against the original provider). Instead
2143                        // we abort cleanly so the operator can re-invoke
2144                        // with `--mode {fallback:?}`. This guarantees no
2145                        // OAuth window is wasted and no partial state
2146                        // is left in the queue.
2147                        return Err(AppError::Validation(format!(
2148                            "preflight detected rate limit on {mode:?}: {reason}; \
2149                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
2150                            mode = args.mode
2151                        )));
2152                    }
2153                    return Err(AppError::Validation(format!(
2154                        "preflight detected rate limit on {mode:?}: {reason}; \
2155                         --fallback-mode matches --mode, no recovery possible",
2156                        mode = args.mode
2157                    )));
2158                }
2159                return Err(AppError::Validation(format!(
2160                    "preflight detected rate limit on {mode:?}: {reason}; \
2161                     {suggestion}; pass --fallback-mode codex to recover",
2162                    mode = args.mode
2163                )));
2164            }
2165            PreflightOutcome::Error(e) => {
2166                return Err(e);
2167            }
2168        }
2169    }
2170
2171    // SCAN phase
2172    let scan_result = scan_operation(&conn, &namespace, args)?;
2173    let total = scan_result.len();
2174
2175    emit_json(&PhaseEvent {
2176        phase: "scan",
2177        binary_path: None,
2178        version: None,
2179        items_total: Some(total),
2180        items_pending: Some(total),
2181        llm_parallelism: Some(args.llm_parallelism),
2182    });
2183
2184    // Dry-run: emit preview events and summary without calling LLM
2185    if args.dry_run {
2186        for (idx, key) in scan_result.iter().enumerate() {
2187            emit_json(&ItemEvent {
2188                item: key,
2189                status: "preview",
2190                memory_id: None,
2191                entity_id: None,
2192                entities: None,
2193                rels: None,
2194                chars_before: None,
2195                chars_after: None,
2196                cost_usd: None,
2197                elapsed_ms: None,
2198                error: None,
2199                index: idx,
2200                total,
2201            });
2202        }
2203        emit_json(&EnrichSummary {
2204            summary: true,
2205            operation: format!("{:?}", args.operation),
2206            items_total: total,
2207            completed: 0,
2208            failed: 0,
2209            skipped: 0,
2210            cost_usd: 0.0,
2211            elapsed_ms: started.elapsed().as_millis() as u64,
2212            backend_invoked: take_enrich_backend(),
2213        });
2214        return Ok(());
2215    }
2216
2217    // All operations in this enum have an execution path.
2218
2219    // Queue setup for resume/retry
2220    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
2221
2222    if args.resume {
2223        let reset = queue_conn
2224            .execute(
2225                "UPDATE queue SET status='pending' WHERE status='processing'",
2226                [],
2227            )
2228            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
2229        if reset > 0 {
2230            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
2231        }
2232    }
2233
2234    if args.retry_failed {
2235        let count = queue_conn
2236            .execute(
2237                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
2238                [],
2239            )
2240            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
2241        tracing::info!(target: "enrich", count, "retrying failed items");
2242    }
2243
2244    if !args.resume && !args.retry_failed && !args.until_empty {
2245        queue_conn
2246            .execute("DELETE FROM queue", [])
2247            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
2248    }
2249
2250    // Populate queue
2251    for (idx, key) in scan_result.iter().enumerate() {
2252        let item_type = match args.operation {
2253            EnrichOperation::EntityDescriptions => "entity",
2254            _ => "memory",
2255        };
2256        if let Err(e) = queue_conn.execute(
2257            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
2258            rusqlite::params![key, item_type],
2259        ) {
2260            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
2261        }
2262        let _ = idx; // suppress unused warning
2263    }
2264
2265    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
2266    // Clamp enforces the range even if the caller bypasses clap validation.
2267    let parallelism = if args.mode == EnrichMode::OpenRouter {
2268        let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
2269        tracing::info!(
2270            target: "enrich",
2271            concurrency = rest,
2272            source = "rest_concurrency",
2273            "OpenRouter REST concurrency (clamp 1..=16)"
2274        );
2275        rest
2276    } else {
2277        let p = args.llm_parallelism.clamp(1, 32) as usize;
2278        tracing::info!(
2279            target: "enrich",
2280            concurrency = p,
2281            source = "llm_parallelism",
2282            "LLM subprocess parallelism (clamp 1..=32)"
2283        );
2284        p
2285    };
2286    if parallelism > 1 {
2287        tracing::info!(
2288            target: "enrich",
2289            llm_parallelism = parallelism,
2290            "parallel LLM processing with bounded thread pool"
2291        );
2292    }
2293    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
2294    // ceiling. The threshold and message depend on the LLM mode because
2295    // Claude Code spawns MCP children (G28-A) while Codex does not.
2296    if parallelism > 4 {
2297        match args.mode {
2298            EnrichMode::ClaudeCode => {
2299                tracing::warn!(
2300                    target: "enrich",
2301                    llm_parallelism = parallelism,
2302                    recommended_max = 4,
2303                    mode = "claude-code",
2304                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
2305                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
2306                     to cut MCP children (G28-A)"
2307                );
2308            }
2309            EnrichMode::Codex if parallelism > 16 => {
2310                tracing::warn!(
2311                    target: "enrich",
2312                    llm_parallelism = parallelism,
2313                    recommended_max = 16,
2314                    mode = "codex",
2315                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
2316                     consider --llm-parallelism 8 for safer concurrency"
2317                );
2318            }
2319            EnrichMode::Codex => {
2320                // No warning: codex does not spawn MCP children and was
2321                // validated at parallelism 8 in production (1161 items,
2322                // 0 failures) per the 2026-06-04 session audit.
2323            }
2324            EnrichMode::Opencode if parallelism > 16 => {
2325                tracing::warn!(
2326                    target: "enrich",
2327                    llm_parallelism = parallelism,
2328                    recommended_max = 16,
2329                    mode = "opencode",
2330                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
2331                     consider --llm-parallelism 8 for safer concurrency"
2332                );
2333            }
2334            EnrichMode::Opencode => {
2335                // No warning: opencode does not spawn MCP children.
2336            }
2337            EnrichMode::OpenRouter => {
2338                // No warning: OpenRouter is a bounded HTTP fan-out (no
2339                // subprocess); --llm-parallelism is respected as-is.
2340            }
2341        }
2342    }
2343
2344    let mut completed = 0usize;
2345    let mut failed = 0usize;
2346    let mut skipped = 0usize;
2347    let mut cost_total = 0.0f64;
2348    let mut oauth_detected = false;
2349    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2350    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2351    let enrich_started = std::time::Instant::now();
2352
2353    let provider_timeout = match args.mode {
2354        EnrichMode::ClaudeCode => args.claude_timeout,
2355        EnrichMode::Codex => args.codex_timeout,
2356        EnrichMode::Opencode => args.opencode_timeout,
2357        EnrichMode::OpenRouter => args.openrouter_timeout,
2358    };
2359
2360    let provider_model: Option<&str> = match args.mode {
2361        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
2362        EnrichMode::Codex => args.codex_model.as_deref(),
2363        EnrichMode::Opencode => args.opencode_model.as_deref(),
2364        EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
2365    };
2366
2367    // GAP-ENRICH-BACKLOG-CONVERGE: --until-empty wraps the scan→populate→drain
2368    // cycle in an internal loop so the external bash retry loop is unnecessary.
2369    // Without --until-empty the loop body runs exactly once (legacy behaviour).
2370    let until_deadline = std::time::Instant::now()
2371        + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
2372    loop {
2373        if args.until_empty {
2374            // Re-scan and re-enqueue eligible candidates each iteration.
2375            // INSERT OR IGNORE never resurrects a dead-letter row (item_key is
2376            // UNIQUE), so the backlog converges instead of looping forever.
2377            let rescan = scan_operation(&conn, &namespace, args)?;
2378            let rescan_item_type = match args.operation {
2379                EnrichOperation::EntityDescriptions => "entity",
2380                _ => "memory",
2381            };
2382            for key in &rescan {
2383                let _ = queue_conn.execute(
2384                    "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
2385                    rusqlite::params![key, rescan_item_type],
2386                );
2387            }
2388        }
2389        let completed_before = completed;
2390
2391        // G19: when parallelism > 1, spawn bounded worker threads.
2392        // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
2393        // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
2394        if parallelism > 1 {
2395            let stdout_mu = parking_lot::Mutex::new(());
2396            let budget = args.max_cost_usd;
2397            let operation = args.operation.clone();
2398            let mode = args.mode.clone();
2399            let min_oc = args.min_output_chars;
2400            let max_oc = args.max_output_chars;
2401            let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2402
2403            struct WorkerResult {
2404                completed: usize,
2405                failed: usize,
2406                skipped: usize,
2407                cost: f64,
2408                oauth: bool,
2409            }
2410
2411            let results: Vec<WorkerResult> = std::thread::scope(|s| {
2412                let handles: Vec<_> = (0..parallelism)
2413                .map(|worker_id| {
2414                    let stdout_mu = &stdout_mu;
2415                    let paths = &paths;
2416                    let namespace = &namespace;
2417                    let provider_binary = provider_binary.as_deref();
2418                    let operation = &operation;
2419                    let mode = &mode;
2420                    let prompt_tpl = prompt_tpl.as_deref();
2421                    s.spawn(move || {
2422                        let w_conn = match open_rw(&paths.db) {
2423                            Ok(c) => c,
2424                            Err(e) => {
2425                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2426                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2427                            }
2428                        };
2429                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2430                            Ok(c) => c,
2431                            Err(e) => {
2432                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2433                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2434                            }
2435                        };
2436                        let mut w_completed = 0usize;
2437                        let mut w_failed = 0usize;
2438                        let mut w_skipped = 0usize;
2439                        let mut w_cost = 0.0f64;
2440                        let mut w_oauth = false;
2441                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2442                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2443                        // G28-D: per-worker circuit breaker that aborts the
2444                        // loop after `circuit_breaker_threshold` consecutive
2445                        // HardFailure outcomes (transient/rate-limited errors
2446                        // do NOT count, so a recovering provider is not
2447                        // penalised).
2448                        let mut w_breaker = crate::retry::CircuitBreaker::new(
2449                            args.circuit_breaker_threshold.max(1),
2450                            std::time::Duration::from_secs(60),
2451                        );
2452
2453                        loop {
2454                            if crate::shutdown_requested() {
2455                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2456                                break;
2457                            }
2458                            if let Some(b) = budget {
2459                                if !w_oauth && w_cost >= b {
2460                                    break;
2461                                }
2462                            }
2463                            let pending: Option<(i64, String, String, i64)> = w_queue
2464                                .query_row(
2465                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2466                                     WHERE id = (SELECT id FROM queue WHERE status='pending' \
2467                                                   AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
2468                                                 ORDER BY id LIMIT 1) \
2469                                     RETURNING id, item_key, item_type, attempt",
2470                                    [],
2471                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
2472                                )
2473                                .ok();
2474                            let (queue_id, item_key, _item_type, attempt_current) = match pending {
2475                                Some(p) => p,
2476                                None => break,
2477                            };
2478                            let item_started = Instant::now();
2479                            let current_index = w_completed + w_failed + w_skipped;
2480
2481                            let call_result = match operation {
2482                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2483                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2484                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2485                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2486                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2487                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2488                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2489                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2490                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2491                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2492                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2493                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2494                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2495                            };
2496
2497                            match call_result {
2498                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2499                                    if is_oauth { w_oauth = true; }
2500                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2501                                    let _ = w_queue.execute(
2502                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2503                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2504                                    );
2505                                    w_completed += 1;
2506                                    if !is_oauth { w_cost += cost; }
2507                                    // G28-D: count success; resets breaker.
2508                                    let _ = w_breaker
2509                                        .record(crate::retry::AttemptOutcome::Success);
2510                                    let _guard = stdout_mu.lock();
2511                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2512                                }
2513                                Ok(EnrichItemResult::Skipped { reason }) => {
2514                                    w_skipped += 1;
2515                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2516                                    let _guard = stdout_mu.lock();
2517                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2518                                }
2519                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2520                                    // G29 Passo 4: worker mirror of the
2521                                    // serial path. Counted as a soft
2522                                    // skip so the queue surface shows
2523                                    // a quality issue rather than a
2524                                    // transport failure.
2525                                    w_skipped += 1;
2526                                    let reason = format!(
2527                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2528                                    );
2529                                    let _ = w_queue.execute(
2530                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2531                                        rusqlite::params![reason, queue_id],
2532                                    );
2533                                    let _guard = stdout_mu.lock();
2534                                    emit_json(&ItemEvent {
2535                                        item: &item_key,
2536                                        status: "preservation_failed",
2537                                        memory_id: None,
2538                                        entity_id: None,
2539                                        entities: None,
2540                                        rels: None,
2541                                        chars_before: Some(chars_before),
2542                                        chars_after: Some(chars_after),
2543                                        cost_usd: None,
2544                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2545                                        error: Some(reason),
2546                                        index: current_index,
2547                                        total,
2548                                    });
2549                                }
2550                                Err(e) => {
2551                                    let err_str = format!("{e}");
2552                                    if matches!(e, AppError::RateLimited { .. }) {
2553                                        if crate::retry::is_kill_switch_active() {
2554                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2555                                        } else if std::time::Instant::now() >= w_deadline {
2556                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2557                                        } else {
2558                                            let half = w_backoff / 2;
2559                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2560                                            let actual_wait = half + jitter;
2561                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2562                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2563                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2564                                            w_backoff = (w_backoff * 2).min(900);
2565                                            continue;
2566                                        }
2567                                    }
2568                                    w_failed += 1;
2569                                    let outcome = record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e);
2570                                    let _guard = stdout_mu.lock();
2571                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2572                                    // G28-D: feed the classified outcome to the breaker (transient
2573                                    // failures do not count toward opening it).
2574                                    let breaker_opened = w_breaker.record(outcome);
2575                                    if breaker_opened {
2576                                        tracing::error!(target: "enrich",
2577                                            consecutive_failures = w_breaker.consecutive_failures(),
2578                                            "circuit breaker opened — aborting worker"
2579                                        );
2580                                        break;
2581                                    }
2582                                }
2583                            }
2584                        }
2585                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2586                    })
2587                })
2588                .collect();
2589                handles
2590                    .into_iter()
2591                    .map(|h| {
2592                        h.join().unwrap_or(WorkerResult {
2593                            completed: 0,
2594                            failed: 0,
2595                            skipped: 0,
2596                            cost: 0.0,
2597                            oauth: false,
2598                        })
2599                    })
2600                    .collect()
2601            });
2602
2603            for r in &results {
2604                completed += r.completed;
2605                failed += r.failed;
2606                skipped += r.skipped;
2607                cost_total += r.cost;
2608                if r.oauth && !oauth_detected {
2609                    oauth_detected = true;
2610                }
2611            }
2612        } else {
2613            // Serial path (parallelism == 1) — original loop
2614            loop {
2615                if crate::shutdown_requested() {
2616                    tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2617                    break;
2618                }
2619
2620                // Budget check
2621                if let Some(budget) = args.max_cost_usd {
2622                    if !oauth_detected && cost_total >= budget {
2623                        tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2624                        break;
2625                    }
2626                }
2627
2628                // Dequeue next pending item
2629                let pending: Option<(i64, String, String, i64)> = queue_conn
2630                    .query_row(
2631                        "UPDATE queue SET status='processing', attempt=attempt+1 \
2632                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
2633                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
2634                             ORDER BY id LIMIT 1) \
2635                 RETURNING id, item_key, item_type, attempt",
2636                        [],
2637                        |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
2638                    )
2639                    .ok();
2640
2641                let (queue_id, item_key, item_type, attempt_current) = match pending {
2642                    Some(p) => p,
2643                    None => break,
2644                };
2645
2646                let item_started = Instant::now();
2647                let current_index = completed + failed + skipped;
2648
2649                let call_result = match args.operation {
2650                    EnrichOperation::MemoryBindings => call_memory_bindings(
2651                        &conn,
2652                        &namespace,
2653                        &item_key,
2654                        provider_binary
2655                            .as_deref()
2656                            .expect("provider binary required"),
2657                        provider_model,
2658                        provider_timeout,
2659                        &args.mode,
2660                    ),
2661                    EnrichOperation::EntityDescriptions => call_entity_description(
2662                        &conn,
2663                        &namespace,
2664                        &item_key,
2665                        provider_binary
2666                            .as_deref()
2667                            .expect("provider binary required"),
2668                        provider_model,
2669                        provider_timeout,
2670                        &args.mode,
2671                    ),
2672                    EnrichOperation::BodyEnrich => call_body_enrich(
2673                        &conn,
2674                        &namespace,
2675                        &item_key,
2676                        provider_binary
2677                            .as_deref()
2678                            .expect("provider binary required"),
2679                        provider_model,
2680                        provider_timeout,
2681                        &args.mode,
2682                        args.min_output_chars,
2683                        args.max_output_chars,
2684                        args.prompt_template.as_deref(),
2685                        args.preserve_threshold,
2686                        &paths,
2687                        llm_backend,
2688                        embedding_backend,
2689                    ),
2690                    EnrichOperation::ReEmbed => call_reembed(
2691                        &conn,
2692                        &namespace,
2693                        &item_key,
2694                        &paths,
2695                        llm_backend,
2696                        embedding_backend,
2697                    ),
2698                    EnrichOperation::WeightCalibrate => call_weight_calibrate(
2699                        &conn,
2700                        &namespace,
2701                        &item_key,
2702                        provider_binary
2703                            .as_deref()
2704                            .expect("provider binary required"),
2705                        provider_model,
2706                        provider_timeout,
2707                        &args.mode,
2708                    ),
2709                    EnrichOperation::RelationReclassify => call_relation_reclassify(
2710                        &conn,
2711                        &namespace,
2712                        &item_key,
2713                        provider_binary
2714                            .as_deref()
2715                            .expect("provider binary required"),
2716                        provider_model,
2717                        provider_timeout,
2718                        &args.mode,
2719                    ),
2720                    EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2721                        call_entity_connect(
2722                            &conn,
2723                            &namespace,
2724                            &item_key,
2725                            provider_binary
2726                                .as_deref()
2727                                .expect("provider binary required"),
2728                            provider_model,
2729                            provider_timeout,
2730                            &args.mode,
2731                        )
2732                    }
2733                    EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2734                        &conn,
2735                        &namespace,
2736                        &item_key,
2737                        provider_binary
2738                            .as_deref()
2739                            .expect("provider binary required"),
2740                        provider_model,
2741                        provider_timeout,
2742                        &args.mode,
2743                    ),
2744                    EnrichOperation::DescriptionEnrich => call_description_enrich(
2745                        &conn,
2746                        &namespace,
2747                        &item_key,
2748                        provider_binary
2749                            .as_deref()
2750                            .expect("provider binary required"),
2751                        provider_model,
2752                        provider_timeout,
2753                        &args.mode,
2754                    ),
2755                    EnrichOperation::DomainClassify => call_domain_classify(
2756                        &conn,
2757                        &namespace,
2758                        &item_key,
2759                        provider_binary
2760                            .as_deref()
2761                            .expect("provider binary required"),
2762                        provider_model,
2763                        provider_timeout,
2764                        &args.mode,
2765                    ),
2766                    EnrichOperation::GraphAudit => call_graph_audit(
2767                        &conn,
2768                        &namespace,
2769                        &item_key,
2770                        provider_binary
2771                            .as_deref()
2772                            .expect("provider binary required"),
2773                        provider_model,
2774                        provider_timeout,
2775                        &args.mode,
2776                    ),
2777                    EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2778                        &conn,
2779                        &namespace,
2780                        &item_key,
2781                        provider_binary
2782                            .as_deref()
2783                            .expect("provider binary required"),
2784                        provider_model,
2785                        provider_timeout,
2786                        &args.mode,
2787                    ),
2788                    EnrichOperation::BodyExtract => call_body_extract(
2789                        &conn,
2790                        &namespace,
2791                        &item_key,
2792                        provider_binary
2793                            .as_deref()
2794                            .expect("provider binary required"),
2795                        provider_model,
2796                        provider_timeout,
2797                        &args.mode,
2798                    ),
2799                };
2800
2801                match call_result {
2802                    Ok(EnrichItemResult::Done {
2803                        memory_id,
2804                        entity_id,
2805                        entities,
2806                        rels,
2807                        chars_before,
2808                        chars_after,
2809                        cost,
2810                        is_oauth,
2811                    }) => {
2812                        if is_oauth && !oauth_detected {
2813                            oauth_detected = true;
2814                            tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2815                        }
2816                        backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2817
2818                        // Persist depends on the operation
2819                        let persist_err: Option<String> = match args.operation {
2820                            EnrichOperation::MemoryBindings => {
2821                                // Bindings already persisted inside call_memory_bindings
2822                                None
2823                            }
2824                            EnrichOperation::EntityDescriptions => {
2825                                // Description already persisted inside call_entity_description
2826                                None
2827                            }
2828                            EnrichOperation::BodyEnrich => {
2829                                // Body already persisted inside call_body_enrich
2830                                None
2831                            }
2832                            _ => {
2833                                // All G27 operations persist inside their call_* function
2834                                None
2835                            }
2836                        };
2837
2838                        if let Err(e) = queue_conn.execute(
2839                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2840                    rusqlite::params![
2841                        memory_id,
2842                        entity_id,
2843                        entities as i64,
2844                        rels as i64,
2845                        cost,
2846                        item_started.elapsed().as_millis() as i64,
2847                        queue_id
2848                    ],
2849                ) {
2850                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2851                    }
2852
2853                        if persist_err.is_none() {
2854                            completed += 1;
2855                            if !is_oauth {
2856                                cost_total += cost;
2857                            }
2858                            emit_json(&ItemEvent {
2859                                item: &item_key,
2860                                status: "done",
2861                                memory_id,
2862                                entity_id,
2863                                entities: Some(entities),
2864                                rels: Some(rels),
2865                                chars_before,
2866                                chars_after,
2867                                cost_usd: if is_oauth { None } else { Some(cost) },
2868                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2869                                error: None,
2870                                index: current_index,
2871                                total,
2872                            });
2873                        } else {
2874                            failed += 1;
2875                            emit_json(&ItemEvent {
2876                                item: &item_key,
2877                                status: "failed",
2878                                memory_id: None,
2879                                entity_id: None,
2880                                entities: None,
2881                                rels: None,
2882                                chars_before: None,
2883                                chars_after: None,
2884                                cost_usd: None,
2885                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2886                                error: persist_err,
2887                                index: current_index,
2888                                total,
2889                            });
2890                        }
2891                    }
2892                    Ok(EnrichItemResult::Skipped { reason }) => {
2893                        skipped += 1;
2894                        if let Err(e) = queue_conn.execute(
2895                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2896                    rusqlite::params![reason, queue_id],
2897                ) {
2898                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2899                    }
2900                        emit_json(&ItemEvent {
2901                            item: &item_key,
2902                            status: "skipped",
2903                            memory_id: None,
2904                            entity_id: None,
2905                            entities: None,
2906                            rels: None,
2907                            chars_before: None,
2908                            chars_after: None,
2909                            cost_usd: None,
2910                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2911                            error: None,
2912                            index: current_index,
2913                            total,
2914                        });
2915                    }
2916                    Ok(EnrichItemResult::PreservationFailed {
2917                        score,
2918                        threshold,
2919                        chars_before,
2920                        chars_after,
2921                    }) => {
2922                        // G29 Passo 4: the LLM rewrite diverged too far from
2923                        // the original body. Count as a soft failure (not
2924                        // `failed`) so the queue surfaces it as a quality
2925                        // issue, not a transport error. The reason is
2926                        // structured so the operator can audit why a body
2927                        // was rejected.
2928                        skipped += 1;
2929                        let reason = format!(
2930                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2931                    );
2932                        if let Err(qe) = queue_conn.execute(
2933                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2934                        rusqlite::params![reason, queue_id],
2935                    ) {
2936                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2937                    }
2938                        emit_json(&ItemEvent {
2939                            item: &item_key,
2940                            status: "preservation_failed",
2941                            memory_id: None,
2942                            entity_id: None,
2943                            entities: None,
2944                            rels: None,
2945                            chars_before: Some(chars_before),
2946                            chars_after: Some(chars_after),
2947                            cost_usd: None,
2948                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2949                            error: Some(reason),
2950                            index: current_index,
2951                            total,
2952                        });
2953                    }
2954                    Err(e) => {
2955                        let err_str = format!("{e}");
2956                        if matches!(e, AppError::RateLimited { .. }) {
2957                            if crate::retry::is_kill_switch_active() {
2958                                tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2959                            } else if std::time::Instant::now() >= rate_limit_deadline {
2960                                tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2961                            } else {
2962                                let half = backoff_secs / 2;
2963                                let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2964                                let actual_wait = half + jitter;
2965                                tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2966                                if let Err(qe) = queue_conn.execute(
2967                                    "UPDATE queue SET status='pending' WHERE id=?1",
2968                                    rusqlite::params![queue_id],
2969                                ) {
2970                                    tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2971                                }
2972                                std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2973                                backoff_secs = (backoff_secs * 2).min(900);
2974                                continue;
2975                            }
2976                        }
2977
2978                        failed += 1;
2979                        let _outcome = record_item_failure(
2980                            &queue_conn,
2981                            queue_id,
2982                            attempt_current,
2983                            args.max_attempts,
2984                            &e,
2985                        );
2986                        emit_json(&ItemEvent {
2987                            item: &item_key,
2988                            status: "failed",
2989                            memory_id: None,
2990                            entity_id: None,
2991                            entities: None,
2992                            rels: None,
2993                            chars_before: None,
2994                            chars_after: None,
2995                            cost_usd: None,
2996                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2997                            error: Some(err_str),
2998                            index: current_index,
2999                            total,
3000                        });
3001                    }
3002                }
3003
3004                let _ = item_type; // used via queue schema only
3005            }
3006        } // end else (serial path)
3007
3008        if !args.until_empty {
3009            break;
3010        }
3011        let eligible_remaining: i64 = queue_conn
3012            .query_row(
3013                "SELECT COUNT(*) FROM queue WHERE status='pending' \
3014                 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
3015                [],
3016                |r| r.get(0),
3017            )
3018            .unwrap_or(0);
3019        let progressed = completed > completed_before;
3020        if std::time::Instant::now() >= until_deadline {
3021            tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
3022            break;
3023        }
3024        if !progressed && eligible_remaining == 0 {
3025            tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
3026            break;
3027        }
3028        if eligible_remaining == 0 {
3029            // Remaining pending items are waiting on backoff; nap and re-check.
3030            std::thread::sleep(std::time::Duration::from_secs(1));
3031        }
3032    } // end until-empty loop
3033
3034    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
3035    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
3036
3037    emit_json(&EnrichSummary {
3038        summary: true,
3039        operation: format!("{:?}", args.operation),
3040        items_total: total,
3041        completed,
3042        failed,
3043        skipped,
3044        cost_usd: cost_total,
3045        elapsed_ms: started.elapsed().as_millis() as u64,
3046        backend_invoked: take_enrich_backend(),
3047    });
3048
3049    if failed == 0 {
3050        // GAP-ENRICH-BACKLOG-CONVERGE: keep the queue file when dead-letter rows
3051        // exist so `enrich --status` can still report them on the next run.
3052        let dead: i64 = queue_conn
3053            .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
3054                r.get(0)
3055            })
3056            .unwrap_or(0);
3057        if dead == 0 {
3058            let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
3059        }
3060    }
3061
3062    Ok(())
3063}
3064
3065// ---------------------------------------------------------------------------
3066// Internal result type for a single item call
3067// ---------------------------------------------------------------------------
3068
3069enum EnrichItemResult {
3070    Done {
3071        memory_id: Option<i64>,
3072        entity_id: Option<i64>,
3073        entities: usize,
3074        rels: usize,
3075        chars_before: Option<usize>,
3076        chars_after: Option<usize>,
3077        cost: f64,
3078        is_oauth: bool,
3079    },
3080    Skipped {
3081        reason: String,
3082    },
3083    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
3084    /// body beyond the configured `--preserve-threshold` and was rejected
3085    /// before persistence. The trigram-Jaccard score and threshold are
3086    /// emitted in the NDJSON stream for operator audit.
3087    PreservationFailed {
3088        score: f64,
3089        threshold: f64,
3090        chars_before: usize,
3091        chars_after: usize,
3092    },
3093}
3094
3095// ---------------------------------------------------------------------------
3096// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
3097// ---------------------------------------------------------------------------
3098
3099fn call_memory_bindings(
3100    conn: &Connection,
3101    namespace: &str,
3102    memory_name: &str,
3103    binary: &Path,
3104    model: Option<&str>,
3105    timeout: u64,
3106    mode: &EnrichMode,
3107) -> Result<EnrichItemResult, AppError> {
3108    // Look up the memory
3109    let (memory_id, body): (i64, String) = conn.query_row(
3110        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3111        rusqlite::params![namespace, memory_name],
3112        |r| Ok((r.get(0)?, r.get(1)?)),
3113    ).map_err(|e| match e {
3114        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
3115        other => AppError::Database(other),
3116    })?;
3117
3118    if body.trim().is_empty() {
3119        return Ok(EnrichItemResult::Skipped {
3120            reason: "body is empty".to_string(),
3121        });
3122    }
3123
3124    let (value, cost, is_oauth) = match mode {
3125        EnrichMode::ClaudeCode => call_claude(
3126            binary,
3127            BINDINGS_PROMPT,
3128            BINDINGS_SCHEMA,
3129            &body,
3130            model,
3131            timeout,
3132        )?,
3133        EnrichMode::Codex => call_codex(
3134            binary,
3135            BINDINGS_PROMPT,
3136            BINDINGS_SCHEMA,
3137            &body,
3138            model,
3139            timeout,
3140        )?,
3141        EnrichMode::Opencode => call_opencode(
3142            binary,
3143            BINDINGS_PROMPT,
3144            BINDINGS_SCHEMA,
3145            &body,
3146            model,
3147            timeout,
3148        )?,
3149        EnrichMode::OpenRouter => {
3150            call_openrouter(BINDINGS_PROMPT, BINDINGS_SCHEMA, &body, model, timeout)?
3151        }
3152    };
3153
3154    let empty_arr = serde_json::Value::Array(vec![]);
3155    let entities_val = value.get("entities").unwrap_or(&empty_arr);
3156    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
3157
3158    let (ent_count, rel_count) =
3159        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
3160
3161    Ok(EnrichItemResult::Done {
3162        memory_id: Some(memory_id),
3163        entity_id: None,
3164        entities: ent_count,
3165        rels: rel_count,
3166        chars_before: None,
3167        chars_after: None,
3168        cost,
3169        is_oauth,
3170    })
3171}
3172
3173fn call_entity_description(
3174    conn: &Connection,
3175    namespace: &str,
3176    entity_name: &str,
3177    binary: &Path,
3178    model: Option<&str>,
3179    timeout: u64,
3180    mode: &EnrichMode,
3181) -> Result<EnrichItemResult, AppError> {
3182    let (entity_id, entity_type): (i64, String) = conn
3183        .query_row(
3184            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
3185            rusqlite::params![namespace, entity_name],
3186            |r| Ok((r.get(0)?, r.get(1)?)),
3187        )
3188        .map_err(|e| match e {
3189            rusqlite::Error::QueryReturnedNoRows => {
3190                AppError::NotFound(format!("entity '{entity_name}' not found"))
3191            }
3192            other => AppError::Database(other),
3193        })?;
3194
3195    let prompt = format!(
3196        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
3197    );
3198
3199    let (value, cost, is_oauth) = match mode {
3200        EnrichMode::ClaudeCode => call_claude(
3201            binary,
3202            &prompt,
3203            ENTITY_DESCRIPTION_SCHEMA,
3204            "",
3205            model,
3206            timeout,
3207        )?,
3208        EnrichMode::Codex => call_codex(
3209            binary,
3210            &prompt,
3211            ENTITY_DESCRIPTION_SCHEMA,
3212            "",
3213            model,
3214            timeout,
3215        )?,
3216        EnrichMode::Opencode => call_opencode(
3217            binary,
3218            &prompt,
3219            ENTITY_DESCRIPTION_SCHEMA,
3220            "",
3221            model,
3222            timeout,
3223        )?,
3224        EnrichMode::OpenRouter => {
3225            call_openrouter(&prompt, ENTITY_DESCRIPTION_SCHEMA, "", model, timeout)?
3226        }
3227    };
3228
3229    let description = value
3230        .get("description")
3231        .and_then(|v| v.as_str())
3232        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
3233
3234    persist_entity_description(conn, entity_id, description)?;
3235
3236    Ok(EnrichItemResult::Done {
3237        memory_id: None,
3238        entity_id: Some(entity_id),
3239        entities: 0,
3240        rels: 0,
3241        chars_before: None,
3242        chars_after: None,
3243        cost,
3244        is_oauth,
3245    })
3246}
3247
3248#[allow(clippy::too_many_arguments)]
3249fn call_body_enrich(
3250    conn: &Connection,
3251    namespace: &str,
3252    memory_name: &str,
3253    binary: &Path,
3254    model: Option<&str>,
3255    timeout: u64,
3256    mode: &EnrichMode,
3257    min_output_chars: usize,
3258    max_output_chars: usize,
3259    prompt_template: Option<&Path>,
3260    preserve_threshold: f64,
3261    paths: &crate::paths::AppPaths,
3262    llm_backend: crate::cli::LlmBackendChoice,
3263    embedding_backend: crate::cli::EmbeddingBackendChoice,
3264) -> Result<EnrichItemResult, AppError> {
3265    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
3266        .query_row(
3267            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
3268         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3269            rusqlite::params![namespace, memory_name],
3270            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3271        )
3272        .map_err(|e| match e {
3273            rusqlite::Error::QueryReturnedNoRows => {
3274                AppError::NotFound(format!("memory '{memory_name}' not found"))
3275            }
3276            other => AppError::Database(other),
3277        })?;
3278
3279    let chars_before = body.chars().count();
3280
3281    // G26: gather graph context for contextualized enrichment
3282    let linked_entities: Vec<String> = {
3283        let mut stmt = conn.prepare_cached(
3284            "SELECT e.name FROM memory_entities me \
3285             JOIN entities e ON e.id = me.entity_id \
3286             WHERE me.memory_id = ?1 LIMIT 10",
3287        )?;
3288        let result: Vec<String> = stmt
3289            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
3290            .filter_map(|r| r.ok())
3291            .collect();
3292        drop(stmt);
3293        result
3294    };
3295
3296    // Load custom prompt template if provided
3297    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
3298        let file_size = std::fs::metadata(tmpl_path)
3299            .map_err(|e| {
3300                AppError::Io(std::io::Error::new(
3301                    e.kind(),
3302                    format!("failed to stat prompt template: {e}"),
3303                ))
3304            })?
3305            .len();
3306        if file_size > MAX_MEMORY_BODY_LEN as u64 {
3307            return Err(AppError::LimitExceeded(
3308                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
3309            ));
3310        }
3311        std::fs::read_to_string(tmpl_path).map_err(|e| {
3312            AppError::Io(std::io::Error::new(
3313                e.kind(),
3314                format!("failed to read prompt template: {e}"),
3315            ))
3316        })?
3317    } else {
3318        BODY_ENRICH_PROMPT_PREFIX.to_string()
3319    };
3320
3321    // G26: build contextualized prompt with graph data
3322    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
3323        let mut ctx = String::new();
3324        ctx.push_str(&format!(
3325            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
3326        ));
3327        if !description.is_empty() {
3328            ctx.push_str(&format!("- Description: {description}\n"));
3329        }
3330        ctx.push_str(&format!("- Domain: {namespace}\n"));
3331        if !linked_entities.is_empty() {
3332            ctx.push_str(&format!(
3333                "- Linked entities: {}\n",
3334                linked_entities.join(", ")
3335            ));
3336        }
3337        ctx
3338    } else {
3339        String::new()
3340    };
3341
3342    let prompt = format!(
3343        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
3344    );
3345
3346    // The body schema uses a free-form enriched_body field
3347    let (value, cost, is_oauth) = match mode {
3348        EnrichMode::ClaudeCode => {
3349            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3350        }
3351        EnrichMode::Codex => {
3352            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3353        }
3354        EnrichMode::Opencode => {
3355            call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3356        }
3357        EnrichMode::OpenRouter => {
3358            call_openrouter(&prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3359        }
3360    };
3361
3362    let enriched_body = value
3363        .get("enriched_body")
3364        .and_then(|v| v.as_str())
3365        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
3366
3367    let chars_after = enriched_body.chars().count();
3368
3369    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
3370    // a trigram-Jaccard similarity between the original body and the
3371    // LLM-rewritten body. When the score falls below
3372    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
3373    // rewrite as a likely hallucination. The result is recorded in the
3374    // NDJSON stream so operators can audit what the LLM tried to do.
3375    let threshold = preserve_threshold;
3376    let verdict =
3377        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
3378    if !verdict.is_accepted() {
3379        return Ok(EnrichItemResult::PreservationFailed {
3380            score: match verdict {
3381                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
3382                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
3383                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
3384            },
3385            threshold,
3386            chars_before,
3387            chars_after,
3388        });
3389    }
3390
3391    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
3392    // compare the hash of the original body against the hash of the enriched
3393    // body. Identical hashes mean the LLM produced a byte-for-byte identical
3394    // body (rare but possible) — treat as `Skipped` so re-running the batch
3395    // is safe and the queue does not get re-persisted entries.
3396    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
3397    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
3398    if old_hash == new_hash {
3399        return Ok(EnrichItemResult::Skipped {
3400            reason: format!(
3401                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
3402            ),
3403        });
3404    }
3405
3406    // Only persist if the enriched body is genuinely longer
3407    if chars_after <= chars_before {
3408        return Ok(EnrichItemResult::Skipped {
3409            reason: format!(
3410                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
3411            ),
3412        });
3413    }
3414
3415    persist_enriched_body(
3416        conn,
3417        namespace,
3418        memory_id,
3419        memory_name,
3420        enriched_body,
3421        paths,
3422        llm_backend,
3423        embedding_backend,
3424    )?;
3425
3426    Ok(EnrichItemResult::Done {
3427        memory_id: Some(memory_id),
3428        entity_id: None,
3429        entities: 0,
3430        rels: 0,
3431        chars_before: Some(chars_before),
3432        chars_after: Some(chars_after),
3433        cost,
3434        is_oauth,
3435    })
3436}
3437
3438fn call_reembed(
3439    conn: &Connection,
3440    namespace: &str,
3441    memory_name: &str,
3442    paths: &crate::paths::AppPaths,
3443    llm_backend: crate::cli::LlmBackendChoice,
3444    embedding_backend: crate::cli::EmbeddingBackendChoice,
3445) -> Result<EnrichItemResult, AppError> {
3446    let (memory_id, body, memory_type): (i64, String, String) = conn
3447        .query_row(
3448            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
3449             FROM memories
3450             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3451            rusqlite::params![namespace, memory_name],
3452            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3453        )
3454        .map_err(|e| match e {
3455            rusqlite::Error::QueryReturnedNoRows => {
3456                AppError::NotFound(format!("memory '{memory_name}' not found"))
3457            }
3458            other => AppError::Database(other),
3459        })?;
3460
3461    if body.trim().is_empty() {
3462        return Ok(EnrichItemResult::Skipped {
3463            reason: "body is empty".to_string(),
3464        });
3465    }
3466
3467    reembed_memory_vector(
3468        conn,
3469        namespace,
3470        memory_id,
3471        memory_name,
3472        &memory_type,
3473        &body,
3474        paths,
3475        llm_backend,
3476        embedding_backend,
3477    )?;
3478
3479    Ok(EnrichItemResult::Done {
3480        memory_id: Some(memory_id),
3481        entity_id: None,
3482        entities: 0,
3483        rels: 0,
3484        chars_before: Some(body.chars().count()),
3485        chars_after: Some(body.chars().count()),
3486        cost: 0.0,
3487        is_oauth: true,
3488    })
3489}
3490
3491// ---------------------------------------------------------------------------
3492// Scan dispatcher — maps operation to scan query result (item keys)
3493// ---------------------------------------------------------------------------
3494
3495fn scan_operation(
3496    conn: &Connection,
3497    namespace: &str,
3498    args: &EnrichArgs,
3499) -> Result<Vec<String>, AppError> {
3500    // G37: resolve --names + --names-file once and apply to every scan path.
3501    let name_filter = resolve_name_filter(args)?;
3502    match args.operation {
3503        EnrichOperation::MemoryBindings => {
3504            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3505            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3506        }
3507        EnrichOperation::EntityDescriptions => {
3508            let rows =
3509                scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3510            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3511        }
3512        EnrichOperation::BodyEnrich => {
3513            let rows = scan_short_body_memories(
3514                conn,
3515                namespace,
3516                args.min_output_chars,
3517                args.limit,
3518                &name_filter,
3519            )?;
3520            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3521        }
3522        EnrichOperation::ReEmbed => {
3523            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3524            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3525        }
3526        EnrichOperation::WeightCalibrate => {
3527            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3528            Ok(rows
3529                .into_iter()
3530                .map(|(id, _, _, _, _)| id.to_string())
3531                .collect())
3532        }
3533        EnrichOperation::RelationReclassify => {
3534            let rows = scan_generic_relations(conn, namespace, args.limit)?;
3535            Ok(rows
3536                .into_iter()
3537                .map(|(id, _, _, _)| id.to_string())
3538                .collect())
3539        }
3540        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3541            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3542            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3543        }
3544        EnrichOperation::EntityTypeValidate => {
3545            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3546            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3547        }
3548        EnrichOperation::DescriptionEnrich => {
3549            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3550            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3551        }
3552        EnrichOperation::DomainClassify
3553        | EnrichOperation::GraphAudit
3554        | EnrichOperation::DeepResearchSynth
3555        | EnrichOperation::BodyExtract => {
3556            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3557            let sql = format!(
3558                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3559            );
3560            let mut stmt = conn.prepare(&sql)?;
3561            let names = stmt
3562                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3563                .collect::<Result<Vec<_>, _>>()?;
3564            Ok(names)
3565        }
3566    }
3567}
3568
3569// ---------------------------------------------------------------------------
3570// Codex stub provider
3571// ---------------------------------------------------------------------------
3572
3573/// Locates the Codex CLI binary.
3574fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3575    if let Some(p) = explicit {
3576        if p.exists() {
3577            return Ok(p.to_path_buf());
3578        }
3579        return Err(AppError::Validation(format!(
3580            "Codex binary not found at explicit path: {}",
3581            p.display()
3582        )));
3583    }
3584
3585    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3586        let p = PathBuf::from(&env_path);
3587        if p.exists() {
3588            return Ok(p);
3589        }
3590    }
3591
3592    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3593    if let Some(path_var) = std::env::var_os("PATH") {
3594        for dir in std::env::split_paths(&path_var) {
3595            let candidate = dir.join(name);
3596            if candidate.exists() {
3597                return Ok(crate::extract::llm_embedding::resolve_real_binary(
3598                    &candidate,
3599                ));
3600            }
3601        }
3602    }
3603
3604    Err(AppError::Validation(
3605        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3606    ))
3607}
3608
3609/// G27: Calibrate weight of a single relationship via LLM.
3610fn call_weight_calibrate(
3611    conn: &Connection,
3612    _namespace: &str,
3613    item_key: &str,
3614    binary: &Path,
3615    model: Option<&str>,
3616    timeout: u64,
3617    mode: &EnrichMode,
3618) -> Result<EnrichItemResult, AppError> {
3619    let rel_id: i64 = item_key
3620        .parse()
3621        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3622    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3623        .query_row(
3624            "SELECT e1.name, e2.name, r.relation, r.weight \
3625             FROM relationships r \
3626             JOIN entities e1 ON e1.id = r.source_id \
3627             JOIN entities e2 ON e2.id = r.target_id \
3628             WHERE r.id = ?1",
3629            rusqlite::params![rel_id],
3630            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3631        )
3632        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3633
3634    let input_text = format!(
3635        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3636    );
3637    let (value, cost, is_oauth) = match mode {
3638        EnrichMode::ClaudeCode => call_claude(
3639            binary,
3640            WEIGHT_CALIBRATE_PROMPT,
3641            WEIGHT_CALIBRATE_SCHEMA,
3642            &input_text,
3643            model,
3644            timeout,
3645        )?,
3646        EnrichMode::Codex => call_codex(
3647            binary,
3648            WEIGHT_CALIBRATE_PROMPT,
3649            WEIGHT_CALIBRATE_SCHEMA,
3650            &input_text,
3651            model,
3652            timeout,
3653        )?,
3654        EnrichMode::Opencode => call_opencode(
3655            binary,
3656            WEIGHT_CALIBRATE_PROMPT,
3657            WEIGHT_CALIBRATE_SCHEMA,
3658            &input_text,
3659            model,
3660            timeout,
3661        )?,
3662        EnrichMode::OpenRouter => call_openrouter(
3663            WEIGHT_CALIBRATE_PROMPT,
3664            WEIGHT_CALIBRATE_SCHEMA,
3665            &input_text,
3666            model,
3667            timeout,
3668        )?,
3669    };
3670
3671    let calibrated = value
3672        .get("calibrated_weight")
3673        .and_then(|v| v.as_f64())
3674        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3675
3676    conn.execute(
3677        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3678        rusqlite::params![calibrated, rel_id],
3679    )?;
3680
3681    Ok(EnrichItemResult::Done {
3682        memory_id: None,
3683        entity_id: None,
3684        entities: 0,
3685        rels: 1,
3686        chars_before: None,
3687        chars_after: None,
3688        cost,
3689        is_oauth,
3690    })
3691}
3692
3693/// G27: Reclassify a generic relationship type via LLM.
3694fn call_relation_reclassify(
3695    conn: &Connection,
3696    _namespace: &str,
3697    item_key: &str,
3698    binary: &Path,
3699    model: Option<&str>,
3700    timeout: u64,
3701    mode: &EnrichMode,
3702) -> Result<EnrichItemResult, AppError> {
3703    let rel_id: i64 = item_key
3704        .parse()
3705        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3706    let (source_name, target_name, current_relation): (String, String, String) = conn
3707        .query_row(
3708            "SELECT e1.name, e2.name, r.relation \
3709             FROM relationships r \
3710             JOIN entities e1 ON e1.id = r.source_id \
3711             JOIN entities e2 ON e2.id = r.target_id \
3712             WHERE r.id = ?1",
3713            rusqlite::params![rel_id],
3714            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3715        )
3716        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3717
3718    let input_text = format!(
3719        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3720    );
3721    let (value, cost, is_oauth) = match mode {
3722        EnrichMode::ClaudeCode => call_claude(
3723            binary,
3724            RELATION_RECLASSIFY_PROMPT,
3725            RELATION_RECLASSIFY_SCHEMA,
3726            &input_text,
3727            model,
3728            timeout,
3729        )?,
3730        EnrichMode::Codex => call_codex(
3731            binary,
3732            RELATION_RECLASSIFY_PROMPT,
3733            RELATION_RECLASSIFY_SCHEMA,
3734            &input_text,
3735            model,
3736            timeout,
3737        )?,
3738        EnrichMode::Opencode => call_opencode(
3739            binary,
3740            RELATION_RECLASSIFY_PROMPT,
3741            RELATION_RECLASSIFY_SCHEMA,
3742            &input_text,
3743            model,
3744            timeout,
3745        )?,
3746        EnrichMode::OpenRouter => call_openrouter(
3747            RELATION_RECLASSIFY_PROMPT,
3748            RELATION_RECLASSIFY_SCHEMA,
3749            &input_text,
3750            model,
3751            timeout,
3752        )?,
3753    };
3754
3755    let new_relation = value
3756        .get("relation")
3757        .and_then(|v| v.as_str())
3758        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3759    let new_strength = value
3760        .get("strength")
3761        .and_then(|v| v.as_f64())
3762        .unwrap_or(0.5);
3763
3764    conn.execute(
3765        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3766        rusqlite::params![new_relation, new_strength, rel_id],
3767    )?;
3768
3769    Ok(EnrichItemResult::Done {
3770        memory_id: None,
3771        entity_id: None,
3772        entities: 0,
3773        rels: 1,
3774        chars_before: None,
3775        chars_after: None,
3776        cost,
3777        is_oauth,
3778    })
3779}
3780
3781/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3782fn call_entity_connect(
3783    conn: &Connection,
3784    namespace: &str,
3785    item_key: &str,
3786    binary: &Path,
3787    model: Option<&str>,
3788    timeout: u64,
3789    mode: &EnrichMode,
3790) -> Result<EnrichItemResult, AppError> {
3791    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3792    let (e1_id, e1_name, e2_id, e2_name) =
3793        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3794            Some(p) => p,
3795            None => {
3796                return Ok(EnrichItemResult::Skipped {
3797                    reason: "pair no longer isolated".into(),
3798                })
3799            }
3800        };
3801    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3802    let (value, cost, is_oauth) = match mode {
3803        EnrichMode::ClaudeCode => call_claude(
3804            binary,
3805            ENTITY_CONNECT_PROMPT,
3806            ENTITY_CONNECT_SCHEMA,
3807            &input_text,
3808            model,
3809            timeout,
3810        )?,
3811        EnrichMode::Codex => call_codex(
3812            binary,
3813            ENTITY_CONNECT_PROMPT,
3814            ENTITY_CONNECT_SCHEMA,
3815            &input_text,
3816            model,
3817            timeout,
3818        )?,
3819        EnrichMode::Opencode => call_opencode(
3820            binary,
3821            ENTITY_CONNECT_PROMPT,
3822            ENTITY_CONNECT_SCHEMA,
3823            &input_text,
3824            model,
3825            timeout,
3826        )?,
3827        EnrichMode::OpenRouter => call_openrouter(
3828            ENTITY_CONNECT_PROMPT,
3829            ENTITY_CONNECT_SCHEMA,
3830            &input_text,
3831            model,
3832            timeout,
3833        )?,
3834    };
3835    let relation = value
3836        .get("relation")
3837        .and_then(|v| v.as_str())
3838        .unwrap_or("none");
3839    if relation == "none" {
3840        return Ok(EnrichItemResult::Skipped {
3841            reason: "LLM determined no relationship".into(),
3842        });
3843    }
3844    let strength = value
3845        .get("strength")
3846        .and_then(|v| v.as_f64())
3847        .unwrap_or(0.5);
3848    conn.execute(
3849        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3850        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3851    )?;
3852    Ok(EnrichItemResult::Done {
3853        memory_id: None,
3854        entity_id: None,
3855        entities: 0,
3856        rels: 1,
3857        chars_before: None,
3858        chars_after: None,
3859        cost,
3860        is_oauth,
3861    })
3862}
3863
3864/// G27 P2: Validate entity type assignment via LLM.
3865fn call_entity_type_validate(
3866    conn: &Connection,
3867    _namespace: &str,
3868    item_key: &str,
3869    binary: &Path,
3870    model: Option<&str>,
3871    timeout: u64,
3872    mode: &EnrichMode,
3873) -> Result<EnrichItemResult, AppError> {
3874    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3875        .query_row(
3876            "SELECT id, name, type FROM entities WHERE name = ?1",
3877            rusqlite::params![item_key],
3878            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3879        )
3880        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3881    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3882    let (value, cost, is_oauth) = match mode {
3883        EnrichMode::ClaudeCode => call_claude(
3884            binary,
3885            ENTITY_TYPE_VALIDATE_PROMPT,
3886            ENTITY_TYPE_VALIDATE_SCHEMA,
3887            &input_text,
3888            model,
3889            timeout,
3890        )?,
3891        EnrichMode::Codex => call_codex(
3892            binary,
3893            ENTITY_TYPE_VALIDATE_PROMPT,
3894            ENTITY_TYPE_VALIDATE_SCHEMA,
3895            &input_text,
3896            model,
3897            timeout,
3898        )?,
3899        EnrichMode::Opencode => call_opencode(
3900            binary,
3901            ENTITY_TYPE_VALIDATE_PROMPT,
3902            ENTITY_TYPE_VALIDATE_SCHEMA,
3903            &input_text,
3904            model,
3905            timeout,
3906        )?,
3907        EnrichMode::OpenRouter => call_openrouter(
3908            ENTITY_TYPE_VALIDATE_PROMPT,
3909            ENTITY_TYPE_VALIDATE_SCHEMA,
3910            &input_text,
3911            model,
3912            timeout,
3913        )?,
3914    };
3915    let validated_type = value
3916        .get("validated_type")
3917        .and_then(|v| v.as_str())
3918        .unwrap_or(&ent_type);
3919    let was_correct = value
3920        .get("was_correct")
3921        .and_then(|v| v.as_bool())
3922        .unwrap_or(true);
3923    if !was_correct {
3924        conn.execute(
3925            "UPDATE entities SET type = ?1 WHERE id = ?2",
3926            rusqlite::params![validated_type, ent_id],
3927        )?;
3928    }
3929    Ok(EnrichItemResult::Done {
3930        memory_id: None,
3931        entity_id: Some(ent_id),
3932        entities: 1,
3933        rels: 0,
3934        chars_before: None,
3935        chars_after: None,
3936        cost,
3937        is_oauth,
3938    })
3939}
3940
3941/// G27 P2: Enrich generic memory description via LLM.
3942fn call_description_enrich(
3943    conn: &Connection,
3944    _namespace: &str,
3945    item_key: &str,
3946    binary: &Path,
3947    model: Option<&str>,
3948    timeout: u64,
3949    mode: &EnrichMode,
3950) -> Result<EnrichItemResult, AppError> {
3951    let (mem_id, body, old_desc): (i64, String, String) = conn
3952        .query_row(
3953            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3954            rusqlite::params![item_key],
3955            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3956        )
3957        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3958    let snippet: String = body.chars().take(500).collect();
3959    let input_text = format!(
3960        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3961    );
3962    let (value, cost, is_oauth) = match mode {
3963        EnrichMode::ClaudeCode => call_claude(
3964            binary,
3965            DESCRIPTION_ENRICH_PROMPT,
3966            DESCRIPTION_ENRICH_SCHEMA,
3967            &input_text,
3968            model,
3969            timeout,
3970        )?,
3971        EnrichMode::Codex => call_codex(
3972            binary,
3973            DESCRIPTION_ENRICH_PROMPT,
3974            DESCRIPTION_ENRICH_SCHEMA,
3975            &input_text,
3976            model,
3977            timeout,
3978        )?,
3979        EnrichMode::Opencode => call_opencode(
3980            binary,
3981            DESCRIPTION_ENRICH_PROMPT,
3982            DESCRIPTION_ENRICH_SCHEMA,
3983            &input_text,
3984            model,
3985            timeout,
3986        )?,
3987        EnrichMode::OpenRouter => call_openrouter(
3988            DESCRIPTION_ENRICH_PROMPT,
3989            DESCRIPTION_ENRICH_SCHEMA,
3990            &input_text,
3991            model,
3992            timeout,
3993        )?,
3994    };
3995    let new_desc = value
3996        .get("description")
3997        .and_then(|v| v.as_str())
3998        .unwrap_or(&old_desc);
3999    let old_name: String = conn.query_row(
4000        "SELECT name FROM memories WHERE id = ?1",
4001        rusqlite::params![mem_id],
4002        |r| r.get(0),
4003    )?;
4004    conn.execute(
4005        "UPDATE memories SET description = ?1 WHERE id = ?2",
4006        rusqlite::params![new_desc, mem_id],
4007    )?;
4008    memories::sync_fts_after_update(
4009        conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
4010    )?;
4011    Ok(EnrichItemResult::Done {
4012        memory_id: Some(mem_id),
4013        entity_id: None,
4014        entities: 0,
4015        rels: 0,
4016        chars_before: Some(old_desc.len()),
4017        chars_after: Some(new_desc.len()),
4018        cost,
4019        is_oauth,
4020    })
4021}
4022
4023/// G27 P2: Classify memory into domain category via LLM.
4024fn call_domain_classify(
4025    conn: &Connection,
4026    _namespace: &str,
4027    item_key: &str,
4028    binary: &Path,
4029    model: Option<&str>,
4030    timeout: u64,
4031    mode: &EnrichMode,
4032) -> Result<EnrichItemResult, AppError> {
4033    let (mem_id, body, desc): (i64, String, String) = conn
4034        .query_row(
4035            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4036            rusqlite::params![item_key],
4037            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4038        )
4039        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4040    let snippet: String = body.chars().take(500).collect();
4041    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
4042    let (value, cost, is_oauth) = match mode {
4043        EnrichMode::ClaudeCode => call_claude(
4044            binary,
4045            DOMAIN_CLASSIFY_PROMPT,
4046            DOMAIN_CLASSIFY_SCHEMA,
4047            &input_text,
4048            model,
4049            timeout,
4050        )?,
4051        EnrichMode::Codex => call_codex(
4052            binary,
4053            DOMAIN_CLASSIFY_PROMPT,
4054            DOMAIN_CLASSIFY_SCHEMA,
4055            &input_text,
4056            model,
4057            timeout,
4058        )?,
4059        EnrichMode::Opencode => call_opencode(
4060            binary,
4061            DOMAIN_CLASSIFY_PROMPT,
4062            DOMAIN_CLASSIFY_SCHEMA,
4063            &input_text,
4064            model,
4065            timeout,
4066        )?,
4067        EnrichMode::OpenRouter => call_openrouter(
4068            DOMAIN_CLASSIFY_PROMPT,
4069            DOMAIN_CLASSIFY_SCHEMA,
4070            &input_text,
4071            model,
4072            timeout,
4073        )?,
4074    };
4075    let domain = value
4076        .get("domain")
4077        .and_then(|v| v.as_str())
4078        .unwrap_or("uncategorized");
4079    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
4080    conn.execute(
4081        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
4082        rusqlite::params![metadata, mem_id],
4083    )?;
4084    Ok(EnrichItemResult::Done {
4085        memory_id: Some(mem_id),
4086        entity_id: None,
4087        entities: 0,
4088        rels: 0,
4089        chars_before: None,
4090        chars_after: None,
4091        cost,
4092        is_oauth,
4093    })
4094}
4095
4096/// G27 P2: Audit memory graph quality via LLM.
4097fn call_graph_audit(
4098    conn: &Connection,
4099    _namespace: &str,
4100    item_key: &str,
4101    binary: &Path,
4102    model: Option<&str>,
4103    timeout: u64,
4104    mode: &EnrichMode,
4105) -> Result<EnrichItemResult, AppError> {
4106    let (mem_id, body, desc): (i64, String, String) = conn
4107        .query_row(
4108            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4109            rusqlite::params![item_key],
4110            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4111        )
4112        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4113    let snippet: String = body.chars().take(500).collect();
4114    let ent_count: i64 = conn
4115        .query_row(
4116            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
4117            rusqlite::params![mem_id],
4118            |r| r.get(0),
4119        )
4120        .unwrap_or(0);
4121    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
4122    let (value, cost, is_oauth) = match mode {
4123        EnrichMode::ClaudeCode => call_claude(
4124            binary,
4125            GRAPH_AUDIT_PROMPT,
4126            GRAPH_AUDIT_SCHEMA,
4127            &input_text,
4128            model,
4129            timeout,
4130        )?,
4131        EnrichMode::Codex => call_codex(
4132            binary,
4133            GRAPH_AUDIT_PROMPT,
4134            GRAPH_AUDIT_SCHEMA,
4135            &input_text,
4136            model,
4137            timeout,
4138        )?,
4139        EnrichMode::Opencode => call_opencode(
4140            binary,
4141            GRAPH_AUDIT_PROMPT,
4142            GRAPH_AUDIT_SCHEMA,
4143            &input_text,
4144            model,
4145            timeout,
4146        )?,
4147        EnrichMode::OpenRouter => call_openrouter(
4148            GRAPH_AUDIT_PROMPT,
4149            GRAPH_AUDIT_SCHEMA,
4150            &input_text,
4151            model,
4152            timeout,
4153        )?,
4154    };
4155    let issues = value
4156        .get("issues")
4157        .and_then(|v| v.as_array())
4158        .map(|a| a.len())
4159        .unwrap_or(0);
4160    Ok(EnrichItemResult::Done {
4161        memory_id: Some(mem_id),
4162        entity_id: None,
4163        entities: 0,
4164        rels: issues,
4165        chars_before: None,
4166        chars_after: None,
4167        cost,
4168        is_oauth,
4169    })
4170}
4171
4172/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
4173fn call_deep_research_synth(
4174    conn: &Connection,
4175    namespace: &str,
4176    item_key: &str,
4177    binary: &Path,
4178    model: Option<&str>,
4179    timeout: u64,
4180    mode: &EnrichMode,
4181) -> Result<EnrichItemResult, AppError> {
4182    let (mem_id, body): (i64, String) = conn
4183        .query_row(
4184            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4185            rusqlite::params![item_key],
4186            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
4187        )
4188        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4189    let snippet: String = body.chars().take(2000).collect();
4190    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
4191    let (value, cost, is_oauth) = match mode {
4192        EnrichMode::ClaudeCode => call_claude(
4193            binary,
4194            DEEP_RESEARCH_SYNTH_PROMPT,
4195            DEEP_RESEARCH_SYNTH_SCHEMA,
4196            &input_text,
4197            model,
4198            timeout,
4199        )?,
4200        EnrichMode::Codex => call_codex(
4201            binary,
4202            DEEP_RESEARCH_SYNTH_PROMPT,
4203            DEEP_RESEARCH_SYNTH_SCHEMA,
4204            &input_text,
4205            model,
4206            timeout,
4207        )?,
4208        EnrichMode::Opencode => call_opencode(
4209            binary,
4210            DEEP_RESEARCH_SYNTH_PROMPT,
4211            DEEP_RESEARCH_SYNTH_SCHEMA,
4212            &input_text,
4213            model,
4214            timeout,
4215        )?,
4216        EnrichMode::OpenRouter => call_openrouter(
4217            DEEP_RESEARCH_SYNTH_PROMPT,
4218            DEEP_RESEARCH_SYNTH_SCHEMA,
4219            &input_text,
4220            model,
4221            timeout,
4222        )?,
4223    };
4224    let mut ent_count = 0usize;
4225    let mut rel_count = 0usize;
4226    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
4227        for e in ents {
4228            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
4229            let etype_str = e
4230                .get("entity_type")
4231                .and_then(|v| v.as_str())
4232                .unwrap_or("concept");
4233            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
4234            if name.len() >= 2 {
4235                let ne = NewEntity {
4236                    name: name.to_string(),
4237                    entity_type: etype,
4238                    description: None,
4239                };
4240                let _ = entities::upsert_entity(conn, namespace, &ne);
4241                ent_count += 1;
4242            }
4243        }
4244    }
4245    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
4246        for r in rels {
4247            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
4248            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
4249            if src.is_empty() || tgt.is_empty() {
4250                continue;
4251            }
4252            let rel = r
4253                .get("relation")
4254                .and_then(|v| v.as_str())
4255                .unwrap_or("related");
4256            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
4257            if let (Some(sid), Some(tid)) = (
4258                entities::find_entity_id(conn, namespace, src)?,
4259                entities::find_entity_id(conn, namespace, tgt)?,
4260            ) {
4261                let _ = entities::create_or_fetch_relationship(
4262                    conn, namespace, sid, tid, rel, str_, None,
4263                );
4264                rel_count += 1;
4265            }
4266        }
4267    }
4268    Ok(EnrichItemResult::Done {
4269        memory_id: Some(mem_id),
4270        entity_id: None,
4271        entities: ent_count,
4272        rels: rel_count,
4273        chars_before: None,
4274        chars_after: None,
4275        cost,
4276        is_oauth,
4277    })
4278}
4279
4280/// G27 P2: Extract structured body from unstructured text via LLM.
4281fn call_body_extract(
4282    conn: &Connection,
4283    _namespace: &str,
4284    item_key: &str,
4285    binary: &Path,
4286    model: Option<&str>,
4287    timeout: u64,
4288    mode: &EnrichMode,
4289) -> Result<EnrichItemResult, AppError> {
4290    let (mem_id, body, old_desc): (i64, String, String) = conn
4291        .query_row(
4292            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4293            rusqlite::params![item_key],
4294            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4295        )
4296        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4297    let old_name: String = conn.query_row(
4298        "SELECT name FROM memories WHERE id = ?1",
4299        rusqlite::params![mem_id],
4300        |r| r.get(0),
4301    )?;
4302    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
4303    let (value, cost, is_oauth) = match mode {
4304        EnrichMode::ClaudeCode => call_claude(
4305            binary,
4306            BODY_EXTRACT_PROMPT,
4307            BODY_EXTRACT_SCHEMA,
4308            &input_text,
4309            model,
4310            timeout,
4311        )?,
4312        EnrichMode::Codex => call_codex(
4313            binary,
4314            BODY_EXTRACT_PROMPT,
4315            BODY_EXTRACT_SCHEMA,
4316            &input_text,
4317            model,
4318            timeout,
4319        )?,
4320        EnrichMode::Opencode => call_opencode(
4321            binary,
4322            BODY_EXTRACT_PROMPT,
4323            BODY_EXTRACT_SCHEMA,
4324            &input_text,
4325            model,
4326            timeout,
4327        )?,
4328        EnrichMode::OpenRouter => call_openrouter(
4329            BODY_EXTRACT_PROMPT,
4330            BODY_EXTRACT_SCHEMA,
4331            &input_text,
4332            model,
4333            timeout,
4334        )?,
4335    };
4336    let restructured = value
4337        .get("restructured_body")
4338        .and_then(|v| v.as_str())
4339        .unwrap_or(&body);
4340    let chars_before = body.len();
4341    let chars_after = restructured.len();
4342    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
4343    conn.execute(
4344        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
4345        rusqlite::params![restructured, new_hash, mem_id],
4346    )?;
4347    memories::sync_fts_after_update(
4348        conn,
4349        mem_id,
4350        &old_name,
4351        &old_desc,
4352        &body,
4353        &old_name,
4354        &old_desc,
4355        restructured,
4356    )?;
4357    Ok(EnrichItemResult::Done {
4358        memory_id: Some(mem_id),
4359        entity_id: None,
4360        entities: 0,
4361        rels: 0,
4362        chars_before: Some(chars_before),
4363        chars_after: Some(chars_after),
4364        cost,
4365        is_oauth,
4366    })
4367}
4368
4369/// Scan for pairs of entities that share no direct relationship.
4370#[allow(clippy::type_complexity)]
4371fn scan_isolated_entity_pairs(
4372    conn: &Connection,
4373    namespace: &str,
4374    limit: Option<usize>,
4375) -> Result<Vec<(i64, String, i64, String)>, AppError> {
4376    let limit_val = limit.unwrap_or(50) as i64;
4377    let mut stmt = conn.prepare_cached(
4378        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
4379         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
4380         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
4381           (r.source_id = e1.id AND r.target_id = e2.id) OR \
4382           (r.source_id = e2.id AND r.target_id = e1.id)) \
4383         LIMIT ?2",
4384    )?;
4385    let rows = stmt
4386        .query_map(rusqlite::params![namespace, limit_val], |r| {
4387            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
4388        })?
4389        .collect::<Result<Vec<_>, _>>()?;
4390    Ok(rows)
4391}
4392
4393/// Scan for entities with non-validated types (all entities for type audit).
4394fn scan_entities_for_type_validation(
4395    conn: &Connection,
4396    namespace: &str,
4397    limit: Option<usize>,
4398) -> Result<Vec<(i64, String, String)>, AppError> {
4399    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4400    let sql = format!(
4401        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
4402    );
4403    let mut stmt = conn.prepare(&sql)?;
4404    let rows = stmt
4405        .query_map(rusqlite::params![namespace], |r| {
4406            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4407        })?
4408        .collect::<Result<Vec<_>, _>>()?;
4409    Ok(rows)
4410}
4411
4412/// Scan for memories with generic descriptions (ingested, imported, etc).
4413fn scan_generic_descriptions(
4414    conn: &Connection,
4415    namespace: &str,
4416    limit: Option<usize>,
4417) -> Result<Vec<(i64, String, String)>, AppError> {
4418    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4419    let sql = format!(
4420        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
4421         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
4422         ORDER BY id {limit_clause}"
4423    );
4424    let mut stmt = conn.prepare(&sql)?;
4425    let rows = stmt
4426        .query_map(rusqlite::params![namespace], |r| {
4427            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4428        })?
4429        .collect::<Result<Vec<_>, _>>()?;
4430    Ok(rows)
4431}
4432
4433/// Calls the Codex CLI for a single enrichment item.
4434///
4435/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
4436fn call_codex(
4437    binary: &Path,
4438    prompt: &str,
4439    json_schema: &str,
4440    input_text: &str,
4441    model: Option<&str>,
4442    timeout_secs: u64,
4443) -> Result<(serde_json::Value, f64, bool), AppError> {
4444    use wait_timeout::ChildExt;
4445
4446    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
4447    // schema to a trusted cache path (not /tmp), and reuse the
4448    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
4449    // hardening rationale.
4450    super::codex_spawn::validate_codex_model(model)?;
4451    let schema_file = super::codex_spawn::trusted_schema_path()?;
4452
4453    let args = super::codex_spawn::CodexSpawnArgs {
4454        binary,
4455        prompt,
4456        json_schema,
4457        input_text,
4458        model,
4459        timeout_secs,
4460        schema_path: schema_file.clone(),
4461    };
4462    let mut cmd = super::codex_spawn::build_codex_command(&args)?;
4463
4464    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
4465        AppError::Io(std::io::Error::new(
4466            e.kind(),
4467            format!("failed to spawn codex: {e}"),
4468        ))
4469    })?;
4470
4471    let full_prompt = format!("{prompt}\n\n{input_text}");
4472    let stdin_bytes = full_prompt.into_bytes();
4473    let mut child_stdin = child
4474        .stdin
4475        .take()
4476        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
4477    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
4478        child_stdin.write_all(&stdin_bytes)?;
4479        drop(child_stdin);
4480        Ok(())
4481    });
4482
4483    let start = std::time::Instant::now();
4484    let timeout = std::time::Duration::from_secs(timeout_secs);
4485    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4486    let _ = std::fs::remove_file(&schema_file);
4487
4488    match status {
4489        Some(exit_status) => {
4490            stdin_thread
4491                .join()
4492                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
4493                .map_err(AppError::Io)?;
4494
4495            tracing::debug!(
4496                target: "process",
4497                exit_code = ?exit_status.code(),
4498                elapsed_ms = start.elapsed().as_millis() as u64,
4499                "external process completed"
4500            );
4501
4502            let mut stdout_buf = Vec::new();
4503            if let Some(mut out) = child.stdout.take() {
4504                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4505            }
4506            if !exit_status.success() {
4507                let mut stderr_buf = Vec::new();
4508                if let Some(mut err) = child.stderr.take() {
4509                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4510                }
4511                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4512                tracing::warn!(
4513                    target: "enrich",
4514                    exit_code = ?exit_status.code(),
4515                    stderr = %stderr_str.trim(),
4516                    "codex process failed"
4517                );
4518                return Err(AppError::Validation(format!(
4519                    "codex exited with code {:?}: {}",
4520                    exit_status.code(),
4521                    stderr_str.trim()
4522                )));
4523            }
4524            let stdout_str = String::from_utf8(stdout_buf)
4525                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4526            // G32: use the JSONL parser, NOT serde_json::from_str on the
4527            // entire stdout (codex emits one event per line).
4528            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4529            // Return the raw agent_message text parsed as JSON. Different
4530            // operations (memory-bindings, body-enrich) use different
4531            // output schemas, so we let the caller pick which fields to
4532            // extract. The previous implementation hardcoded
4533            // `{entities, urls}` which broke body-enrich.
4534            let value: serde_json::Value =
4535                serde_json::from_str(&result.last_agent_text).map_err(|e| {
4536                    AppError::Validation(format!(
4537                        "codex agent_message is not valid JSON: {e}; raw={}",
4538                        result.last_agent_text
4539                    ))
4540                })?;
4541            Ok((value, 0.0, false))
4542        }
4543        None => {
4544            let _ = child.kill();
4545            let _ = child.wait();
4546            let _ = stdin_thread.join();
4547            Err(AppError::Validation(format!(
4548                "codex timed out after {timeout_secs} seconds"
4549            )))
4550        }
4551    }
4552}
4553
4554fn call_opencode(
4555    binary: &Path,
4556    prompt: &str,
4557    json_schema: &str,
4558    input_text: &str,
4559    model: Option<&str>,
4560    timeout_secs: u64,
4561) -> Result<(serde_json::Value, f64, bool), AppError> {
4562    use wait_timeout::ChildExt;
4563
4564    let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4565
4566    let augmented_prompt = if json_schema.is_empty() {
4567        prompt.to_string()
4568    } else {
4569        format!(
4570            "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4571             The JSON MUST match this schema:\n{json_schema}"
4572        )
4573    };
4574
4575    let mut cmd = super::opencode_runner::build_opencode_command_sync(
4576        binary,
4577        &resolved_model,
4578        &augmented_prompt,
4579        input_text,
4580    )?;
4581
4582    let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4583        AppError::Io(std::io::Error::new(
4584            e.kind(),
4585            format!("failed to spawn opencode: {e}"),
4586        ))
4587    })?;
4588
4589    let start = std::time::Instant::now();
4590    let timeout = std::time::Duration::from_secs(timeout_secs);
4591    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4592
4593    match status {
4594        Some(exit_status) => {
4595            tracing::debug!(
4596                target: "process",
4597                exit_code = ?exit_status.code(),
4598                elapsed_ms = start.elapsed().as_millis() as u64,
4599                "opencode process completed"
4600            );
4601
4602            let mut stdout_buf = Vec::new();
4603            if let Some(mut out) = child.stdout.take() {
4604                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4605            }
4606            if !exit_status.success() {
4607                let mut stderr_buf = Vec::new();
4608                if let Some(mut err) = child.stderr.take() {
4609                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4610                }
4611                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4612                tracing::warn!(
4613                    target: "enrich",
4614                    exit_code = ?exit_status.code(),
4615                    stderr = %stderr_str.trim(),
4616                    "opencode process failed"
4617                );
4618                return Err(AppError::Validation(format!(
4619                    "opencode exited with code {:?}: {}",
4620                    exit_status.code(),
4621                    stderr_str.trim()
4622                )));
4623            }
4624            let stdout_str = String::from_utf8(stdout_buf)
4625                .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4626            let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4627            let value: serde_json::Value =
4628                super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4629                    AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4630                })?;
4631            Ok((value, cost, false))
4632        }
4633        None => {
4634            let _ = child.kill();
4635            let _ = child.wait();
4636            Err(AppError::Validation(format!(
4637                "opencode timed out after {timeout_secs} seconds"
4638            )))
4639        }
4640    }
4641}
4642
4643// ---------------------------------------------------------------------------
4644// Tests
4645// ---------------------------------------------------------------------------
4646
4647#[cfg(test)]
4648mod tests {
4649    use super::*;
4650    use rusqlite::Connection;
4651    #[cfg(unix)]
4652    use std::os::unix::fs::PermissionsExt;
4653
4654    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
4655    fn open_test_db() -> Connection {
4656        let conn = Connection::open_in_memory().expect("in-memory db");
4657        conn.execute_batch(
4658            "CREATE TABLE memories (
4659                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4660                namespace   TEXT NOT NULL DEFAULT 'global',
4661                name        TEXT NOT NULL,
4662                type        TEXT NOT NULL DEFAULT 'note',
4663                description TEXT NOT NULL DEFAULT '',
4664                body        TEXT NOT NULL DEFAULT '',
4665                body_hash   TEXT NOT NULL DEFAULT '',
4666                session_id  TEXT,
4667                source      TEXT NOT NULL DEFAULT 'agent',
4668                metadata    TEXT NOT NULL DEFAULT '{}',
4669                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4670                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4671                deleted_at  INTEGER,
4672                UNIQUE(namespace, name)
4673            );
4674            CREATE TABLE entities (
4675                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4676                namespace   TEXT NOT NULL DEFAULT 'global',
4677                name        TEXT NOT NULL,
4678                type        TEXT NOT NULL DEFAULT 'concept',
4679                description TEXT,
4680                degree      INTEGER NOT NULL DEFAULT 0,
4681                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4682                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4683                UNIQUE(namespace, name)
4684            );
4685            CREATE TABLE memory_entities (
4686                memory_id  INTEGER NOT NULL,
4687                entity_id  INTEGER NOT NULL,
4688                PRIMARY KEY (memory_id, entity_id)
4689            );
4690            CREATE TABLE relationships (
4691                id         INTEGER PRIMARY KEY AUTOINCREMENT,
4692                namespace  TEXT NOT NULL DEFAULT 'global',
4693                source_id  INTEGER NOT NULL,
4694                target_id  INTEGER NOT NULL,
4695                relation   TEXT NOT NULL,
4696                weight     REAL NOT NULL DEFAULT 0.5,
4697                description TEXT,
4698                UNIQUE(source_id, target_id, relation)
4699            );
4700            CREATE TABLE memory_embeddings (
4701                memory_id   INTEGER PRIMARY KEY,
4702                namespace   TEXT NOT NULL,
4703                embedding   BLOB NOT NULL,
4704                source      TEXT NOT NULL,
4705                model       TEXT NOT NULL DEFAULT '',
4706                dim         INTEGER NOT NULL DEFAULT 384,
4707                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
4708            );",
4709        )
4710        .expect("schema creation must succeed");
4711        conn
4712    }
4713
4714    #[test]
4715    fn scan_unbound_memories_finds_memories_without_bindings() {
4716        let conn = open_test_db();
4717        conn.execute(
4718            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4719            [],
4720        )
4721        .unwrap();
4722
4723        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4724        assert_eq!(results.len(), 1);
4725        assert_eq!(results[0].1, "test-mem");
4726    }
4727
4728    #[test]
4729    fn scan_unbound_memories_excludes_bound_memories() {
4730        let conn = open_test_db();
4731        conn.execute(
4732            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4733            [],
4734        )
4735        .unwrap();
4736        let mem_id: i64 = conn
4737            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4738                r.get(0)
4739            })
4740            .unwrap();
4741        conn.execute(
4742            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4743            [],
4744        )
4745        .unwrap();
4746        let ent_id: i64 = conn
4747            .query_row(
4748                "SELECT id FROM entities WHERE name='some-entity'",
4749                [],
4750                |r| r.get(0),
4751            )
4752            .unwrap();
4753        conn.execute(
4754            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4755            rusqlite::params![mem_id, ent_id],
4756        )
4757        .unwrap();
4758
4759        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4760        assert!(results.is_empty(), "bound memory must not appear in scan");
4761    }
4762
4763    #[test]
4764    fn scan_entities_without_description_finds_null_description() {
4765        let conn = open_test_db();
4766        conn.execute(
4767            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4768            [],
4769        )
4770        .unwrap();
4771
4772        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4773        assert_eq!(results.len(), 1);
4774        assert_eq!(results[0].1, "my-tool");
4775    }
4776
4777    #[test]
4778    fn scan_entities_without_description_excludes_entities_with_description() {
4779        let conn = open_test_db();
4780        conn.execute(
4781            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4782            [],
4783        )
4784        .unwrap();
4785
4786        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4787        assert!(
4788            results.is_empty(),
4789            "entity with description must not appear"
4790        );
4791    }
4792
4793    #[test]
4794    fn scan_short_body_memories_finds_short_bodies() {
4795        let conn = open_test_db();
4796        conn.execute(
4797            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4798            [],
4799        )
4800        .unwrap();
4801
4802        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4803        assert_eq!(results.len(), 1);
4804        assert_eq!(results[0].1, "short-mem");
4805    }
4806
4807    #[test]
4808    fn scan_short_body_memories_excludes_long_bodies() {
4809        let conn = open_test_db();
4810        let long_body = "a".repeat(1000);
4811        conn.execute(
4812            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4813            rusqlite::params![long_body],
4814        )
4815        .unwrap();
4816
4817        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4818        assert!(results.is_empty(), "long memory must not appear in scan");
4819    }
4820
4821    #[test]
4822    fn scan_respects_limit() {
4823        let conn = open_test_db();
4824        for i in 0..5 {
4825            conn.execute(
4826                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4827                [],
4828            )
4829            .unwrap();
4830        }
4831
4832        let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4833        assert_eq!(results.len(), 3, "limit must be respected");
4834    }
4835
4836    #[test]
4837    fn scan_memories_without_embeddings_finds_only_missing_rows() {
4838        let conn = open_test_db();
4839        conn.execute(
4840            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4841            [],
4842        )
4843        .unwrap();
4844        conn.execute(
4845            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4846            [],
4847        )
4848        .unwrap();
4849        let memory_id: i64 = conn
4850            .query_row(
4851                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4852                [],
4853                |r| r.get(0),
4854            )
4855            .unwrap();
4856        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4857        memories::upsert_vec(
4858            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4859        )
4860        .unwrap();
4861
4862        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4863        assert_eq!(results.len(), 1);
4864        assert_eq!(results[0].1, "missing-vec");
4865    }
4866
4867    #[test]
4868    fn scan_memories_without_embeddings_respects_name_filter() {
4869        let conn = open_test_db();
4870        conn.execute(
4871            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4872            [],
4873        )
4874        .unwrap();
4875        conn.execute(
4876            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4877            [],
4878        )
4879        .unwrap();
4880
4881        let results =
4882            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4883                .unwrap();
4884        assert_eq!(results.len(), 1);
4885        assert_eq!(results[0].1, "match-me");
4886    }
4887
4888    #[test]
4889    fn queue_db_schema_creates_correctly() {
4890        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4891        let conn = open_queue_db(&tmp_path).expect("queue db must open");
4892        let count: i64 = conn
4893            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4894            .unwrap();
4895        assert_eq!(count, 0);
4896        let _ = std::fs::remove_file(&tmp_path);
4897    }
4898
4899    #[test]
4900    fn parse_claude_output_valid_bindings() {
4901        let output = r#"[
4902            {"type":"system","subtype":"init"},
4903            {"type":"result","is_error":false,"total_cost_usd":0.01,
4904             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4905        ]"#;
4906        let result = crate::commands::claude_runner::parse_claude_output(output)
4907            .expect("must parse successfully");
4908        assert!(result.value.get("entities").is_some());
4909        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4910        assert!(!result.is_oauth);
4911    }
4912
4913    #[test]
4914    fn parse_claude_output_detects_oauth() {
4915        let output = r#"[
4916            {"type":"system","subtype":"init","apiKeySource":"none"},
4917            {"type":"result","is_error":false,"total_cost_usd":0.0,
4918             "structured_output":{"entities":[],"relationships":[]}}
4919        ]"#;
4920        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4921        assert!(result.is_oauth);
4922    }
4923
4924    #[test]
4925    fn parse_claude_output_rate_limit_returns_error() {
4926        let output = r#"[
4927            {"type":"system","subtype":"init"},
4928            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4929        ]"#;
4930        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4931        assert!(matches!(err, AppError::RateLimited { .. }));
4932    }
4933
4934    #[test]
4935    fn parse_claude_output_auth_error() {
4936        let output = r#"[
4937            {"type":"system","subtype":"init"},
4938            {"type":"result","is_error":true,"error":"authentication failed"}
4939        ]"#;
4940        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4941        assert!(format!("{err}").contains("authentication failed"));
4942    }
4943
4944    #[cfg(unix)]
4945    #[test]
4946    fn call_codex_returns_raw_json_for_body_enrich_schema() {
4947        let tmp = tempfile::tempdir().expect("tempdir");
4948        let binary = tmp.path().join("codex-mock");
4949        std::fs::write(
4950            &binary,
4951            r#"#!/usr/bin/env bash
4952set -euo pipefail
4953cat <<'JSONL'
4954{"type":"thread.started","thread_id":"mock-thread-0"}
4955{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4956{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4957JSONL
4958"#,
4959        )
4960        .expect("mock codex write");
4961        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4962        perms.set_mode(0o755);
4963        std::fs::set_permissions(&binary, perms).expect("chmod");
4964
4965        let (value, cost, is_oauth) =
4966            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4967                .expect("call_codex must accept body-enrich payload");
4968
4969        assert_eq!(value["enriched_body"], "expanded body");
4970        assert_eq!(cost, 0.0);
4971        assert!(!is_oauth);
4972    }
4973
4974    #[test]
4975    fn dry_run_emits_preview_without_calling_llm() {
4976        // This test validates the dry-run NDJSON contract without spawning any process.
4977        // The scan_operation function requires a DB; we build one in-memory but cannot
4978        // call run() directly because it needs AppPaths (disk). Instead we test the
4979        // lower-level helpers that the dry-run path relies on.
4980        let conn = open_test_db();
4981        conn.execute(
4982            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4983            [],
4984        )
4985        .unwrap();
4986
4987        let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4988        assert_eq!(results.len(), 1);
4989        assert_eq!(results[0].1, "dry-mem");
4990        // If scan finds the item and dry_run is set, no LLM would be called.
4991        // The NDJSON emission is tested via integration tests with a fake binary.
4992    }
4993
4994    #[test]
4995    fn persist_entity_description_updates_db() {
4996        let conn = open_test_db();
4997        conn.execute(
4998            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4999            [],
5000        )
5001        .unwrap();
5002        let eid: i64 = conn
5003            .query_row(
5004                "SELECT id FROM entities WHERE name='tokio-runtime'",
5005                [],
5006                |r| r.get(0),
5007            )
5008            .unwrap();
5009
5010        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
5011
5012        let desc: String = conn
5013            .query_row(
5014                "SELECT description FROM entities WHERE id=?1",
5015                rusqlite::params![eid],
5016                |r| r.get(0),
5017            )
5018            .unwrap();
5019        assert_eq!(desc, "Async runtime for Rust applications");
5020    }
5021
5022    #[test]
5023    fn bindings_schema_is_valid_json() {
5024        let _: serde_json::Value =
5025            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
5026    }
5027
5028    #[test]
5029    fn entity_description_schema_is_valid_json() {
5030        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
5031            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
5032    }
5033
5034    #[test]
5035    fn body_enrich_schema_is_valid_json() {
5036        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
5037            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
5038    }
5039
5040    // -- GAP-ENRICH-BACKLOG-CONVERGE: dead-letter + backoff tests ------------
5041
5042    fn open_temp_queue() -> (Connection, String) {
5043        let path = format!(
5044            "/tmp/test-enrich-dl-{}-{}.sqlite",
5045            std::process::id(),
5046            fastrand::u64(..)
5047        );
5048        let conn = open_queue_db(&path).expect("queue db must open");
5049        (conn, path)
5050    }
5051
5052    fn insert_pending(conn: &Connection, key: &str) -> i64 {
5053        conn.execute(
5054            "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
5055            rusqlite::params![key],
5056        )
5057        .unwrap();
5058        conn.last_insert_rowid()
5059    }
5060
5061    #[test]
5062    fn classify_rate_limit_is_transient() {
5063        let e = AppError::RateLimited {
5064            detail: "429".into(),
5065        };
5066        assert_eq!(
5067            classify_enrich_outcome(&e),
5068            crate::retry::AttemptOutcome::Transient
5069        );
5070    }
5071
5072    #[test]
5073    fn classify_timeout_and_dbbusy_are_transient() {
5074        let t = AppError::Timeout {
5075            operation: "judge".into(),
5076            duration_secs: 30,
5077        };
5078        let b = AppError::DbBusy("locked".into());
5079        assert_eq!(
5080            classify_enrich_outcome(&t),
5081            crate::retry::AttemptOutcome::Transient
5082        );
5083        assert_eq!(
5084            classify_enrich_outcome(&b),
5085            crate::retry::AttemptOutcome::Transient
5086        );
5087    }
5088
5089    #[test]
5090    fn classify_validation_and_parse_are_hard_failure() {
5091        let v = AppError::Validation("failed to parse entities array: bad".into());
5092        assert_eq!(
5093            classify_enrich_outcome(&v),
5094            crate::retry::AttemptOutcome::HardFailure
5095        );
5096    }
5097
5098    #[test]
5099    fn open_queue_db_alter_is_idempotent() {
5100        let path = format!(
5101            "/tmp/test-enrich-idem-{}-{}.sqlite",
5102            std::process::id(),
5103            fastrand::u64(..)
5104        );
5105        // First open creates the table + dead-letter columns.
5106        let _ = open_queue_db(&path).expect("first open");
5107        // Second open must not error on the already-present columns.
5108        let conn = open_queue_db(&path).expect("second open is idempotent");
5109        let cols: Vec<String> = {
5110            let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
5111            stmt.query_map([], |r| r.get::<_, String>(1))
5112                .unwrap()
5113                .collect::<Result<Vec<_>, _>>()
5114                .unwrap()
5115        };
5116        assert!(cols.iter().any(|c| c == "error_class"));
5117        assert!(cols.iter().any(|c| c == "next_retry_at"));
5118        let _ = std::fs::remove_file(&path);
5119    }
5120
5121    #[test]
5122    fn record_item_failure_hard_marks_dead() {
5123        let (conn, path) = open_temp_queue();
5124        let id = insert_pending(&conn, "mem-hard");
5125        let outcome = record_item_failure(
5126            &conn,
5127            id,
5128            1,
5129            5,
5130            &AppError::Validation("invalid body".into()),
5131        );
5132        assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
5133        let status: String = conn
5134            .query_row(
5135                "SELECT status FROM queue WHERE id=?1",
5136                rusqlite::params![id],
5137                |r| r.get(0),
5138            )
5139            .unwrap();
5140        assert_eq!(status, "dead");
5141        let _ = std::fs::remove_file(&path);
5142    }
5143
5144    #[test]
5145    fn record_item_failure_transient_reschedules_pending() {
5146        let (conn, path) = open_temp_queue();
5147        let id = insert_pending(&conn, "mem-transient");
5148        let outcome = record_item_failure(
5149            &conn,
5150            id,
5151            1,
5152            5,
5153            &AppError::RateLimited {
5154                detail: "429".into(),
5155            },
5156        );
5157        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
5158        let (status, future): (String, i64) = conn
5159            .query_row(
5160                "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
5161                rusqlite::params![id],
5162                |r| Ok((r.get(0)?, r.get(1)?)),
5163            )
5164            .unwrap();
5165        assert_eq!(status, "pending");
5166        assert_eq!(future, 1, "next_retry_at must be in the future");
5167        let _ = std::fs::remove_file(&path);
5168    }
5169
5170    #[test]
5171    fn record_item_failure_transient_at_cap_marks_dead() {
5172        let (conn, path) = open_temp_queue();
5173        let id = insert_pending(&conn, "mem-cap");
5174        // attempt == max_attempts forces dead-letter even for a transient error.
5175        let outcome = record_item_failure(
5176            &conn,
5177            id,
5178            5,
5179            5,
5180            &AppError::RateLimited {
5181                detail: "429".into(),
5182            },
5183        );
5184        assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
5185        let status: String = conn
5186            .query_row(
5187                "SELECT status FROM queue WHERE id=?1",
5188                rusqlite::params![id],
5189                |r| r.get(0),
5190            )
5191            .unwrap();
5192        assert_eq!(status, "dead");
5193        let _ = std::fs::remove_file(&path);
5194    }
5195
5196    #[test]
5197    fn dequeue_skips_future_retry_and_dead() {
5198        let (conn, path) = open_temp_queue();
5199        // Eligible now.
5200        let eligible = insert_pending(&conn, "mem-eligible");
5201        // Pending but scheduled in the future.
5202        let waiting = insert_pending(&conn, "mem-waiting");
5203        conn.execute(
5204            "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
5205            rusqlite::params![waiting],
5206        )
5207        .unwrap();
5208        // Dead-letter must never be selected.
5209        let dead = insert_pending(&conn, "mem-dead");
5210        conn.execute(
5211            "UPDATE queue SET status='dead' WHERE id=?1",
5212            rusqlite::params![dead],
5213        )
5214        .unwrap();
5215
5216        let claimed: Option<i64> = conn
5217            .query_row(
5218                "UPDATE queue SET status='processing', attempt=attempt+1 \
5219                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
5220                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
5221                             ORDER BY id LIMIT 1) \
5222                 RETURNING id",
5223                [],
5224                |r| r.get(0),
5225            )
5226            .ok();
5227        assert_eq!(claimed, Some(eligible));
5228
5229        // A second claim finds nothing eligible (waiting is future, dead excluded).
5230        let second: Option<i64> = conn
5231            .query_row(
5232                "UPDATE queue SET status='processing', attempt=attempt+1 \
5233                 WHERE id = (SELECT id FROM queue WHERE status='pending' \
5234                               AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
5235                             ORDER BY id LIMIT 1) \
5236                 RETURNING id",
5237                [],
5238                |r| r.get(0),
5239            )
5240            .ok();
5241        assert_eq!(second, None);
5242        let _ = std::fs::remove_file(&path);
5243    }
5244}