Skip to main content

sqlite_graphrag/commands/
enrich.rs

1//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
2//!
3//! Enriches the knowledge graph by running LLM-powered analysis over memories
4//! and entities that are missing key structural data. Operations are:
5//!
6//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
7//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
8//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
9//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
10//!
11//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
12//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
13// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
14//!
15//! # DRY opportunity
16//!
17//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
18//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
19//! here verbatim. A future refactoring could extract them into a shared
20//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
21//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
22//! this stream's boundary — flagged here for the Integration stream to evaluate.
23
24use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39// ---------------------------------------------------------------------------
40// Constants
41// ---------------------------------------------------------------------------
42
43const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48// ---------------------------------------------------------------------------
49// JSON schema used for memory-bindings and body-enrich extraction
50// ---------------------------------------------------------------------------
51
52const BINDINGS_SCHEMA: &str = r#"{
53  "type": "object",
54  "properties": {
55    "entities": {
56      "type": "array",
57      "items": {
58        "type": "object",
59        "properties": {
60          "name": { "type": "string" },
61          "entity_type": {
62            "type": "string",
63            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64          }
65        },
66        "required": ["name", "entity_type"],
67        "additionalProperties": false
68      }
69    },
70    "relationships": {
71      "type": "array",
72      "items": {
73        "type": "object",
74        "properties": {
75          "source": { "type": "string" },
76          "target": { "type": "string" },
77          "relation": {
78            "type": "string",
79            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80          },
81          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82        },
83        "required": ["source","target","relation","strength"],
84        "additionalProperties": false
85      }
86    }
87  },
88  "required": ["entities","relationships"],
89  "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93  "type": "object",
94  "properties": {
95    "description": { "type": "string" }
96  },
97  "required": ["description"],
98  "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102  "type": "object",
103  "properties": {
104    "enriched_body": { "type": "string" }
105  },
106  "required": ["enriched_body"],
107  "additionalProperties": false
108}"#;
109
110// G27 P1: weight-calibrate
111const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120  "type": "object",
121  "properties": {
122    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123    "reasoning": { "type": "string" }
124  },
125  "required": ["calibrated_weight", "reasoning"],
126  "additionalProperties": false
127}"#;
128
129// G27 P1: relation-reclassify
130const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146  "type": "object",
147  "properties": {
148    "relation": { "type": "string" },
149    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150    "reasoning": { "type": "string" }
151  },
152  "required": ["relation", "strength", "reasoning"],
153  "additionalProperties": false
154}"#;
155
156// G27 P2: entity-connect — suggest relationships between isolated entities
157const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163  "type": "object",
164  "properties": {
165    "relation": { "type": "string" },
166    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167    "reasoning": { "type": "string" }
168  },
169  "required": ["relation", "strength", "reasoning"],
170  "additionalProperties": false
171}"#;
172
173// G27 P2: entity-type-validate — verify entity type assignments
174const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180  "type": "object",
181  "properties": {
182    "validated_type": { "type": "string" },
183    "was_correct": { "type": "boolean" },
184    "reasoning": { "type": "string" }
185  },
186  "required": ["validated_type", "was_correct", "reasoning"],
187  "additionalProperties": false
188}"#;
189
190// G27 P2: description-enrich — improve generic memory descriptions
191const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197  "type": "object",
198  "properties": {
199    "description": { "type": "string" },
200    "reasoning": { "type": "string" }
201  },
202  "required": ["description", "reasoning"],
203  "additionalProperties": false
204}"#;
205
206// G27 P2: domain-classify — classify memory into domain category
207const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211  "type": "object",
212  "properties": {
213    "domain": { "type": "string" },
214    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215    "reasoning": { "type": "string" }
216  },
217  "required": ["domain", "confidence", "reasoning"],
218  "additionalProperties": false
219}"#;
220
221// G27 P2: graph-audit — audit graph for quality issues
222const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227  "type": "object",
228  "properties": {
229    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231    "reasoning": { "type": "string" }
232  },
233  "required": ["quality_score", "issues", "reasoning"],
234  "additionalProperties": false
235}"#;
236
237// G27 P2: deep-research-synth — synthesize research findings into graph
238const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244  "type": "object",
245  "properties": {
246    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248    "summary": { "type": "string" }
249  },
250  "required": ["entities", "relationships", "summary"],
251  "additionalProperties": false
252}"#;
253
254// G27 P2: body-extract — extract structured content from unstructured text
255const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260  "type": "object",
261  "properties": {
262    "restructured_body": { "type": "string" },
263    "changes_summary": { "type": "string" }
264  },
265  "required": ["restructured_body", "changes_summary"],
266  "additionalProperties": false
267}"#;
268
269// ---------------------------------------------------------------------------
270// Prompts
271// ---------------------------------------------------------------------------
272
273const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288// ---------------------------------------------------------------------------
289// CLI args
290// ---------------------------------------------------------------------------
291
292/// Operation to perform in the `enrich` command.
293#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296    /// Add missing entity/relationship bindings to memories (fully implemented).
297    MemoryBindings,
298    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
299    EntityDescriptions,
300    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
301    BodyEnrich,
302    /// Rebuild missing memory embeddings without rewriting the memory body.
303    ReEmbed,
304    /// Calibrate relationship weights using LLM analysis (scan only).
305    WeightCalibrate,
306    /// Reclassify relationship types using LLM judgment (scan only).
307    RelationReclassify,
308    /// Connect isolated entities by suggesting new relationships (scan only).
309    EntityConnect,
310    /// Validate entity type assignments using LLM judgment (scan only).
311    EntityTypeValidate,
312    /// Enrich memory descriptions that are generic/auto-generated (scan only).
313    DescriptionEnrich,
314    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
315    CrossDomainBridges,
316    /// Classify memories into domain categories (scan only).
317    DomainClassify,
318    /// Audit the graph for quality issues (scan only).
319    GraphAudit,
320    /// Synthesize deep-research findings into graph memories (scan only).
321    DeepResearchSynth,
322    /// Extract structured body from unstructured text (scan only).
323    BodyExtract,
324}
325
326/// LLM provider for enrichment.
327#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329    /// Use locally installed Claude Code CLI (OAuth-first).
330    ClaudeCode,
331    /// Use locally installed OpenAI Codex CLI.
332    Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        match self {
338            EnrichMode::ClaudeCode => write!(f, "claude-code"),
339            EnrichMode::Codex => write!(f, "codex"),
340        }
341    }
342}
343
344/// Arguments for the `enrich` subcommand.
345#[derive(clap::Args)]
346#[command(
347    about = "Enrich graph memories and entities using an LLM provider",
348    after_long_help = "EXAMPLES:\n  \
349    # Add missing entity bindings to all unbound memories\n  \
350    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
351    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
352    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
353    # Expand short memory bodies (GAP-18)\n  \
354    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
355    # Rebuild only missing memory embeddings without rewriting bodies\n  \
356    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
357    # Resume an interrupted body-enrich run\n  \
358    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
359    # Retry only failed items from a previous run\n  \
360    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361    EXIT CODES:\n  \
362    0  success\n  \
363    1  validation error (bad args, binary not found)\n  \
364    14 I/O error"
365)]
366pub struct EnrichArgs {
367    /// Enrichment operation to run.
368    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369    pub operation: EnrichOperation,
370
371    /// LLM provider to use. Default: claude-code (OAuth-first).
372    #[arg(long, value_enum, default_value = "claude-code")]
373    pub mode: EnrichMode,
374
375    /// Maximum number of items to process in this run. Omit for all.
376    #[arg(long, value_name = "N")]
377    pub limit: Option<usize>,
378
379    /// Preview items without calling the LLM (zero tokens consumed).
380    #[arg(long)]
381    pub dry_run: bool,
382
383    /// Namespace to operate on. Default: global.
384    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385    pub namespace: Option<String>,
386
387    // -- Provider flags (Claude) --
388    /// Path to the Claude Code binary. Default: auto-detect from PATH.
389    #[arg(long, value_name = "PATH")]
390    pub claude_binary: Option<PathBuf>,
391
392    /// Claude model to use (e.g. claude-sonnet-4-6).
393    #[arg(long, value_name = "MODEL")]
394    pub claude_model: Option<String>,
395
396    /// Timeout per item in seconds when using Claude Code. Default: 300.
397    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398    pub claude_timeout: u64,
399
400    // -- Provider flags (Codex) --
401    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
402    #[arg(long, value_name = "PATH")]
403    pub codex_binary: Option<PathBuf>,
404
405    /// Codex model to use (e.g. o4-mini).
406    #[arg(long, value_name = "MODEL")]
407    pub codex_model: Option<String>,
408
409    /// Timeout per item in seconds when using Codex. Default: 300.
410    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411    pub codex_timeout: u64,
412
413    // -- Cost controls --
414    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
415    #[arg(long, value_name = "USD")]
416    pub max_cost_usd: Option<f64>,
417
418    // -- Queue controls --
419    /// Resume a previously interrupted run (skip already-done items).
420    #[arg(long)]
421    pub resume: bool,
422
423    /// Retry only items that failed in a previous run.
424    #[arg(long)]
425    pub retry_failed: bool,
426
427    // -- body-enrich specific flags (GAP-18) --
428    /// Minimum output character count for body-enrich. Default: 500.
429    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430    pub min_output_chars: usize,
431
432    /// Maximum output character count for body-enrich. Default: 2000.
433    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434    pub max_output_chars: usize,
435
436    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
437    #[arg(long, default_value_t = true)]
438    pub preserve_check: bool,
439
440    /// Path to a custom prompt template file for body-enrich.
441    #[arg(long, value_name = "PATH")]
442    pub prompt_template: Option<PathBuf>,
443
444    /// Number of parallel LLM workers (default 1 = serial).
445    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
446    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
447    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448    pub llm_parallelism: u32,
449
450    // -- Output / infra --
451    /// Emit NDJSON output. Always true; flag accepted for compatibility.
452    #[arg(long)]
453    pub json: bool,
454
455    /// Database path override.
456    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457    pub db: Option<String>,
458
459    /// G30: poll for the job singleton every second for up to N seconds
460    /// when another invocation holds the lock. Default: 0 (fail fast).
461    #[arg(long, value_name = "SECONDS")]
462    pub wait_job_singleton: Option<u64>,
463
464    /// G30: force acquisition of the singleton lock by removing a stale
465    /// lock file from a previously crashed invocation. Use only when you
466    /// are certain no other `enrich`/`ingest` is running.
467    #[arg(long, default_value_t = false)]
468    pub force_job_singleton: bool,
469
470    /// G37: select a specific subset of memory names to enrich instead of
471    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
472    /// Empty when omitted (processes all candidates).
473    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474    pub names: Vec<String>,
475
476    /// G37: read the subset of memory names from a file (one per line).
477    /// Lines starting with `#` and empty lines are ignored. Combined with
478    /// `--names` (union) when both are set.
479    #[arg(long, value_name = "PATH")]
480    pub names_file: Option<PathBuf>,
481
482    /// G35: probe the LLM provider with a 1-turn ping before processing
483    /// the batch. Aborts with a clear error if the rate-limit window is
484    /// closed (avoids burning N turns only to fail on item 1).
485    #[arg(long, default_value_t = false)]
486    pub preflight_check: bool,
487
488    /// G35: if a preflight probe or in-flight call hits the Claude rate
489    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
490    /// of failing the batch. Ignored when `--mode` is already `codex`.
491    #[arg(long, value_enum)]
492    pub fallback_mode: Option<EnrichMode>,
493
494    /// G35: number of seconds before the OAuth rate-limit reset at which
495    /// the preflight probe should refuse to start. Default 300 (5 min).
496    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497    pub rate_limit_buffer: u64,
498
499    /// G28-D: refuse to start when the 1-minute load average exceeds
500    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
501    /// Set to false to skip the check on contended CI runners.
502    #[arg(long, default_value_t = true)]
503    pub max_load_check: bool,
504
505    /// G28-D: when the system is saturated, abort the job after this
506    /// many consecutive HardFailure outcomes. Default 5.
507    #[arg(long, value_name = "N", default_value_t = 5)]
508    pub circuit_breaker_threshold: u32,
509
510    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
511    /// original body and the LLM-rewritten body for the rewrite to be
512    /// accepted. Scores below the threshold are rejected and emitted as
513    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
514    /// gap specification). Ignored when `--operation` is not
515    /// `body-enrich`.
516    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517    pub preserve_threshold: f64,
518
519    /// G33 Passo 3: when set, validate `--codex-model` against the
520    /// ChatGPT Pro OAuth accepted-model list and abort with a
521    /// suggestion when the value is unknown. Default true (fail fast
522    /// to avoid burning OAuth turns). Set to false to opt out.
523    #[arg(long, default_value_t = true)]
524    pub codex_model_validate: bool,
525
526    /// G33 Passo 3: when set together with an invalid `--codex-model`,
527    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
528    /// instead of aborting. The substitution is recorded in the NDJSON
529    /// stream as `provider_substituted: true` for traceability.
530    #[arg(long, value_name = "MODEL")]
531    pub codex_model_fallback: Option<String>,
532}
533
534// ---------------------------------------------------------------------------
535// Internal types — raw LLM output structs
536// ---------------------------------------------------------------------------
537
538// ---------------------------------------------------------------------------
539// NDJSON event types emitted to stdout
540// ---------------------------------------------------------------------------
541
542#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544    phase: &'a str,
545    #[serde(skip_serializing_if = "Option::is_none")]
546    binary_path: Option<&'a str>,
547    #[serde(skip_serializing_if = "Option::is_none")]
548    version: Option<&'a str>,
549    #[serde(skip_serializing_if = "Option::is_none")]
550    items_total: Option<usize>,
551    #[serde(skip_serializing_if = "Option::is_none")]
552    items_pending: Option<usize>,
553    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
554    #[serde(skip_serializing_if = "Option::is_none")]
555    llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560    /// Item identifier (memory name or entity name).
561    item: &'a str,
562    status: &'a str,
563    #[serde(skip_serializing_if = "Option::is_none")]
564    memory_id: Option<i64>,
565    #[serde(skip_serializing_if = "Option::is_none")]
566    entity_id: Option<i64>,
567    #[serde(skip_serializing_if = "Option::is_none")]
568    entities: Option<usize>,
569    #[serde(skip_serializing_if = "Option::is_none")]
570    rels: Option<usize>,
571    #[serde(skip_serializing_if = "Option::is_none")]
572    chars_before: Option<usize>,
573    #[serde(skip_serializing_if = "Option::is_none")]
574    chars_after: Option<usize>,
575    #[serde(skip_serializing_if = "Option::is_none")]
576    cost_usd: Option<f64>,
577    #[serde(skip_serializing_if = "Option::is_none")]
578    elapsed_ms: Option<u64>,
579    #[serde(skip_serializing_if = "Option::is_none")]
580    error: Option<String>,
581    index: usize,
582    total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587    summary: bool,
588    operation: String,
589    items_total: usize,
590    completed: usize,
591    failed: usize,
592    skipped: usize,
593    cost_usd: f64,
594    elapsed_ms: u64,
595    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
596    /// executou o re-embedding durante o enrich. `"claude" | "codex" | "none"`.
597    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
598    /// ou quando a operação não envolveu re-embed).
599    #[serde(skip_serializing_if = "Option::is_none")]
600    backend_invoked: Option<&'static str>,
601}
602
603use crate::output::emit_json_line as emit_json;
604
605// ---------------------------------------------------------------------------
606// Queue DB
607// ---------------------------------------------------------------------------
608
609/// Opens or creates the enrichment queue database.
610///
611/// The queue schema mirrors `ingest_claude` for resume/retry parity.
612/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
613///
614/// # DRY note
615///
616/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
617/// Both should be unified in a shared `llm_runner.rs` module by the
618/// Integration stream.
619fn open_queue_db(path: &str) -> Result<Connection, AppError> {
620    let conn = Connection::open(path)?;
621    conn.pragma_update(None, "journal_mode", "wal")?;
622    conn.execute_batch(
623        "CREATE TABLE IF NOT EXISTS queue (
624            id          INTEGER PRIMARY KEY AUTOINCREMENT,
625            item_key    TEXT NOT NULL UNIQUE,
626            item_type   TEXT NOT NULL DEFAULT 'memory',
627            status      TEXT NOT NULL DEFAULT 'pending',
628            memory_id   INTEGER,
629            entity_id   INTEGER,
630            entities    INTEGER DEFAULT 0,
631            rels        INTEGER DEFAULT 0,
632            error       TEXT,
633            cost_usd    REAL DEFAULT 0.0,
634            attempt     INTEGER DEFAULT 0,
635            elapsed_ms  INTEGER,
636            created_at  TEXT DEFAULT (datetime('now')),
637            done_at     TEXT
638        );
639        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
640    )?;
641    Ok(conn)
642}
643
644// ---------------------------------------------------------------------------
645// LLM invocation — Claude Code
646// ---------------------------------------------------------------------------
647
648/// Calls `claude -p` via the shared `claude_runner` module (G02).
649///
650/// Returns `(output_value, cost_usd, is_oauth)`.
651fn call_claude(
652    binary: &Path,
653    prompt: &str,
654    json_schema: &str,
655    input_text: &str,
656    model: Option<&str>,
657    timeout_secs: u64,
658) -> Result<(serde_json::Value, f64, bool), AppError> {
659    let result = crate::commands::claude_runner::run_claude(
660        binary,
661        prompt,
662        json_schema,
663        input_text,
664        model,
665        timeout_secs,
666        7,
667    )?;
668    Ok((result.value, result.cost_usd, result.is_oauth))
669}
670
671// ---------------------------------------------------------------------------
672// Preflight probe (G35) — single-turn ping to verify the LLM provider
673// ---------------------------------------------------------------------------
674
675/// Result of a single preflight ping (G35).
676enum PreflightOutcome {
677    /// The provider accepted the ping without rate-limit or other errors.
678    Healthy,
679    /// The provider rejected the ping due to OAuth rate limit. The
680    /// `suggestion` field is a human hint that callers can embed in the
681    /// user-facing error.
682    RateLimited {
683        reason: String,
684        suggestion: &'static str,
685    },
686    /// Any other provider error (binary missing, auth failure, etc.).
687    Error(AppError),
688}
689
690/// Probes the configured LLM provider with a 1-turn ping.
691///
692/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
693/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
694///
695/// The probe intentionally avoids spawning any MCP server children (G28-A)
696/// to keep its own process footprint at the minimum.
697fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
698    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
699
700    match args.mode {
701        EnrichMode::ClaudeCode => {
702            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
703                Ok(b) => b,
704                Err(e) => return PreflightOutcome::Error(e),
705            };
706            let mut cmd = std::process::Command::new(&bin);
707            cmd.env_clear();
708            for var in &["PATH", "HOME", "USER"] {
709                if let Ok(val) = std::env::var(var) {
710                    cmd.env(var, val);
711                }
712            }
713            cmd.arg("-p")
714                .arg("ping")
715                .arg("--max-turns")
716                .arg("1")
717                .arg("--strict-mcp-config")
718                .arg("--mcp-config")
719                .arg("{}")
720                .arg("--dangerously-skip-permissions")
721                .arg("--settings")
722                .arg("{\"hooks\":{}}")
723                .arg("--output-format")
724                .arg("json")
725                .stdin(std::process::Stdio::null())
726                .stdout(std::process::Stdio::piped())
727                .stderr(std::process::Stdio::piped());
728
729            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
730                Ok(c) => c,
731                Err(e) => {
732                    return PreflightOutcome::Error(AppError::Io(e));
733                }
734            };
735            let output = match wait_with_timeout(child, timeout) {
736                Ok(out) => out,
737                Err(e) => return PreflightOutcome::Error(e),
738            };
739            if !output.status.success() {
740                let stderr = String::from_utf8_lossy(&output.stderr);
741                if stderr.contains("hit your session limit")
742                    || stderr.contains("rate_limit")
743                    || stderr.contains("429")
744                {
745                    return PreflightOutcome::RateLimited {
746                        reason: stderr.trim().to_string(),
747                        suggestion:
748                            "wait for the OAuth window to reset or use --fallback-mode codex",
749                    };
750                }
751                return PreflightOutcome::Error(AppError::Validation(format!(
752                    "preflight probe failed: {stderr}",
753                    stderr = stderr.trim()
754                )));
755            }
756            PreflightOutcome::Healthy
757        }
758        EnrichMode::Codex => {
759            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
760                Ok(b) => b,
761                Err(e) => return PreflightOutcome::Error(e),
762            };
763            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
764                .map_err(PreflightOutcome::Error)
765                .ok();
766            let schema = "{}";
767            let schema_path = match super::codex_spawn::trusted_schema_path() {
768                Ok(p) => p,
769                Err(e) => return PreflightOutcome::Error(e),
770            };
771            let spawn_args = super::codex_spawn::CodexSpawnArgs {
772                binary: &bin,
773                prompt: "ping",
774                json_schema: schema,
775                input_text: "",
776                model: args.codex_model.as_deref(),
777                timeout_secs: args.rate_limit_buffer.max(60),
778                schema_path: schema_path.clone(),
779            };
780            let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
781            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
782                Ok(c) => c,
783                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
784            };
785            let output = match wait_with_timeout(child, timeout) {
786                Ok(out) => out,
787                Err(e) => return PreflightOutcome::Error(e),
788            };
789            let _ = std::fs::remove_file(&schema_path);
790            if !output.status.success() {
791                let stderr = String::from_utf8_lossy(&output.stderr);
792                if stderr.contains("rate_limit")
793                    || stderr.contains("429")
794                    || stderr.contains("Too Many Requests")
795                {
796                    return PreflightOutcome::RateLimited {
797                        reason: stderr.trim().to_string(),
798                        suggestion: "wait for the rate-limit window to reset",
799                    };
800                }
801                return PreflightOutcome::Error(AppError::Validation(format!(
802                    "preflight probe failed: {stderr}",
803                    stderr = stderr.trim()
804                )));
805            }
806            PreflightOutcome::Healthy
807        }
808    }
809}
810
811/// Cross-platform wait with timeout (no extra crate dependency).
812fn wait_with_timeout(
813    mut child: std::process::Child,
814    timeout: std::time::Duration,
815) -> Result<std::process::Output, AppError> {
816    use wait_timeout::ChildExt;
817    let start = std::time::Instant::now();
818    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
819    if status.is_none() {
820        let _ = child.kill();
821        let _ = child.wait();
822        return Err(AppError::Validation(format!(
823            "preflight probe timed out after {}s",
824            start.elapsed().as_secs()
825        )));
826    }
827    let mut stdout = Vec::new();
828    if let Some(mut out) = child.stdout.take() {
829        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
830    }
831    let mut stderr = Vec::new();
832    if let Some(mut err) = child.stderr.take() {
833        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
834    }
835    let exit = status.unwrap();
836    Ok(std::process::Output {
837        status: exit,
838        stdout,
839        stderr,
840    })
841}
842
843// ---------------------------------------------------------------------------
844// SCAN helpers — SQL queries that find items needing enrichment
845// ---------------------------------------------------------------------------
846
847/// Returns memories without any `memory_entities` binding.
848///
849/// These are the targets for `memory-bindings` enrichment. When `name_filter`
850/// is non-empty, restricts the scan to the given names (G37); unknown names
851/// are silently skipped (the caller can detect them by comparing
852/// requested vs. returned).
853fn scan_unbound_memories(
854    conn: &Connection,
855    namespace: &str,
856    limit: Option<usize>,
857    name_filter: &[String],
858) -> Result<Vec<(i64, String, String)>, AppError> {
859    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
860
861    if name_filter.is_empty() {
862        let sql = format!(
863            "SELECT m.id, m.name, m.body
864             FROM memories m
865             WHERE m.namespace = ?1
866               AND m.deleted_at IS NULL
867               AND NOT EXISTS (
868                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
869               )
870             ORDER BY m.id
871             {limit_clause}"
872        );
873        let mut stmt = conn.prepare(&sql)?;
874        let rows = stmt
875            .query_map(rusqlite::params![namespace], |r| {
876                Ok((
877                    r.get::<_, i64>(0)?,
878                    r.get::<_, String>(1)?,
879                    r.get::<_, String>(2)?,
880                ))
881            })?
882            .collect::<Result<Vec<_>, _>>()?;
883        Ok(rows)
884    } else {
885        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
886        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
887            .map(|i| format!("?{i}"))
888            .collect();
889        let in_clause = placeholders.join(", ");
890        let sql = format!(
891            "SELECT m.id, m.name, m.body
892             FROM memories m
893             WHERE m.namespace = ?1
894               AND m.deleted_at IS NULL
895               AND m.name IN ({in_clause})
896               AND NOT EXISTS (
897                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
898               )
899             ORDER BY m.id
900             {limit_clause}"
901        );
902        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
903        params_vec.push(&namespace);
904        for n in name_filter {
905            params_vec.push(n);
906        }
907        let mut stmt = conn.prepare(&sql)?;
908        let rows = stmt
909            .query_map(
910                rusqlite::params_from_iter(params_vec.iter().copied()),
911                |r| {
912                    Ok((
913                        r.get::<_, i64>(0)?,
914                        r.get::<_, String>(1)?,
915                        r.get::<_, String>(2)?,
916                    ))
917                },
918            )?
919            .collect::<Result<Vec<_>, _>>()?;
920        Ok(rows)
921    }
922}
923
924/// Reads a list of memory names from a UTF-8 text file (G37).
925///
926/// Empty lines and lines beginning with `#` are skipped. Returns a
927/// de-duplicated, order-preserving list of trimmed names.
928fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
929    let content = std::fs::read_to_string(path).map_err(|e| {
930        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
931    })?;
932    let mut seen = std::collections::HashSet::new();
933    let mut out = Vec::new();
934    for line in content.lines() {
935        let trimmed = line.trim();
936        if trimmed.is_empty() || trimmed.starts_with('#') {
937            continue;
938        }
939        if seen.insert(trimmed.to_string()) {
940            out.push(trimmed.to_string());
941        }
942    }
943    Ok(out)
944}
945
946/// Resolves the union of `--names` and `--names-file` (G37).
947fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
948    let mut combined: Vec<String> = args.names.clone();
949    if let Some(p) = &args.names_file {
950        let from_file = read_names_file(p)?;
951        for n in from_file {
952            if !combined.contains(&n) {
953                combined.push(n);
954            }
955        }
956    }
957    Ok(combined)
958}
959
960/// Returns entities with NULL or empty description.
961///
962/// These are the targets for `entity-descriptions` enrichment.
963fn scan_entities_without_description(
964    conn: &Connection,
965    namespace: &str,
966    limit: Option<usize>,
967) -> Result<Vec<(i64, String, String)>, AppError> {
968    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
969    let sql = format!(
970        "SELECT id, name, type
971         FROM entities
972         WHERE namespace = ?1
973           AND (description IS NULL OR description = '')
974         ORDER BY id
975         {limit_clause}"
976    );
977    let mut stmt = conn.prepare(&sql)?;
978    let rows = stmt
979        .query_map(rusqlite::params![namespace], |r| {
980            Ok((
981                r.get::<_, i64>(0)?,
982                r.get::<_, String>(1)?,
983                r.get::<_, String>(2)?,
984            ))
985        })?
986        .collect::<Result<Vec<_>, _>>()?;
987    Ok(rows)
988}
989
990/// Returns memories whose body length is below the configured minimum.
991///
992/// These are the targets for `body-enrich` (GAP-18).
993fn scan_short_body_memories(
994    conn: &Connection,
995    namespace: &str,
996    min_chars: usize,
997    limit: Option<usize>,
998) -> Result<Vec<(i64, String, String)>, AppError> {
999    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1000    let sql = format!(
1001        "SELECT m.id, m.name, m.body
1002         FROM memories m
1003         WHERE m.namespace = ?1
1004           AND m.deleted_at IS NULL
1005           AND LENGTH(COALESCE(m.body,'')) < ?2
1006         ORDER BY m.id
1007         {limit_clause}"
1008    );
1009    let mut stmt = conn.prepare(&sql)?;
1010    let rows = stmt
1011        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1012            Ok((
1013                r.get::<_, i64>(0)?,
1014                r.get::<_, String>(1)?,
1015                r.get::<_, String>(2)?,
1016            ))
1017        })?
1018        .collect::<Result<Vec<_>, _>>()?;
1019    Ok(rows)
1020}
1021
1022/// Returns live memories that still have no row in `memory_embeddings`.
1023///
1024/// These are the targets for `re-embed`.
1025fn scan_memories_without_embeddings(
1026    conn: &Connection,
1027    namespace: &str,
1028    limit: Option<usize>,
1029    name_filter: &[String],
1030) -> Result<Vec<(i64, String, String)>, AppError> {
1031    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1032
1033    if name_filter.is_empty() {
1034        let sql = format!(
1035            "SELECT m.id, m.name, COALESCE(m.body,'')
1036             FROM memories m
1037             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1038             WHERE m.namespace = ?1
1039               AND m.deleted_at IS NULL
1040               AND me.memory_id IS NULL
1041             ORDER BY m.id
1042             {limit_clause}"
1043        );
1044        let mut stmt = conn.prepare(&sql)?;
1045        let rows = stmt
1046            .query_map(rusqlite::params![namespace], |r| {
1047                Ok((
1048                    r.get::<_, i64>(0)?,
1049                    r.get::<_, String>(1)?,
1050                    r.get::<_, String>(2)?,
1051                ))
1052            })?
1053            .collect::<Result<Vec<_>, _>>()?;
1054        Ok(rows)
1055    } else {
1056        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1057            .map(|i| format!("?{i}"))
1058            .collect();
1059        let in_clause = placeholders.join(", ");
1060        let sql = format!(
1061            "SELECT m.id, m.name, COALESCE(m.body,'')
1062             FROM memories m
1063             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1064             WHERE m.namespace = ?1
1065               AND m.deleted_at IS NULL
1066               AND m.name IN ({in_clause})
1067               AND me.memory_id IS NULL
1068             ORDER BY m.id
1069             {limit_clause}"
1070        );
1071        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1072        params_vec.push(&namespace);
1073        for n in name_filter {
1074            params_vec.push(n);
1075        }
1076        let mut stmt = conn.prepare(&sql)?;
1077        let rows = stmt
1078            .query_map(
1079                rusqlite::params_from_iter(params_vec.iter().copied()),
1080                |r| {
1081                    Ok((
1082                        r.get::<_, i64>(0)?,
1083                        r.get::<_, String>(1)?,
1084                        r.get::<_, String>(2)?,
1085                    ))
1086                },
1087            )?
1088            .collect::<Result<Vec<_>, _>>()?;
1089        Ok(rows)
1090    }
1091}
1092
1093/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1094#[allow(clippy::type_complexity)]
1095fn scan_weight_candidates(
1096    conn: &Connection,
1097    namespace: &str,
1098    limit: Option<usize>,
1099) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1100    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1101    let sql = format!(
1102        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1103         FROM relationships r \
1104         JOIN entities e1 ON e1.id = r.source_id \
1105         JOIN entities e2 ON e2.id = r.target_id \
1106         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1107         ORDER BY r.weight DESC {limit_clause}"
1108    );
1109    let mut stmt = conn.prepare(&sql)?;
1110    let rows = stmt
1111        .query_map(rusqlite::params![namespace], |r| {
1112            Ok((
1113                r.get::<_, i64>(0)?,
1114                r.get::<_, String>(1)?,
1115                r.get::<_, String>(2)?,
1116                r.get::<_, String>(3)?,
1117                r.get::<_, f64>(4)?,
1118            ))
1119        })?
1120        .collect::<Result<Vec<_>, _>>()?;
1121    Ok(rows)
1122}
1123
1124/// G27: Returns relationships with generic relation types (applies_to).
1125fn scan_generic_relations(
1126    conn: &Connection,
1127    namespace: &str,
1128    limit: Option<usize>,
1129) -> Result<Vec<(i64, String, String, String)>, AppError> {
1130    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1131    let sql = format!(
1132        "SELECT r.id, e1.name, e2.name, r.relation \
1133         FROM relationships r \
1134         JOIN entities e1 ON e1.id = r.source_id \
1135         JOIN entities e2 ON e2.id = r.target_id \
1136         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1137         ORDER BY r.id {limit_clause}"
1138    );
1139    let mut stmt = conn.prepare(&sql)?;
1140    let rows = stmt
1141        .query_map(rusqlite::params![namespace], |r| {
1142            Ok((
1143                r.get::<_, i64>(0)?,
1144                r.get::<_, String>(1)?,
1145                r.get::<_, String>(2)?,
1146                r.get::<_, String>(3)?,
1147            ))
1148        })?
1149        .collect::<Result<Vec<_>, _>>()?;
1150    Ok(rows)
1151}
1152
1153// ---------------------------------------------------------------------------
1154// PERSIST helpers for fully-implemented operations
1155// ---------------------------------------------------------------------------
1156
1157/// Persists entity bindings extracted by the LLM for a memory.
1158///
1159/// Creates entities via `upsert_entity`, links them to the memory via
1160/// `link_memory_entity`, and upserts relationships found between entities.
1161fn persist_memory_bindings(
1162    conn: &Connection,
1163    namespace: &str,
1164    memory_id: i64,
1165    entities_json: &serde_json::Value,
1166    rels_json: &serde_json::Value,
1167) -> Result<(usize, usize), AppError> {
1168    #[derive(Deserialize)]
1169    struct EntityItem {
1170        name: String,
1171        entity_type: String,
1172    }
1173    #[derive(Deserialize)]
1174    struct RelItem {
1175        source: String,
1176        target: String,
1177        relation: String,
1178        strength: f64,
1179    }
1180
1181    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1182        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1183
1184    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1185        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1186
1187    let mut ent_count = 0usize;
1188    let mut rel_count = 0usize;
1189
1190    for item in &extracted_entities {
1191        let entity_type = match item.entity_type.parse::<EntityType>() {
1192            Ok(et) => et,
1193            Err(_) => {
1194                tracing::warn!(
1195                    target: "enrich",
1196                    entity = %item.name,
1197                    entity_type = %item.entity_type,
1198                    "entity type not recognized, skipping"
1199                );
1200                continue;
1201            }
1202        };
1203        match entities::upsert_entity(
1204            conn,
1205            namespace,
1206            &NewEntity {
1207                name: item.name.clone(),
1208                entity_type,
1209                description: None,
1210            },
1211        ) {
1212            Ok(eid) => {
1213                let _ = entities::link_memory_entity(conn, memory_id, eid);
1214                ent_count += 1;
1215            }
1216            Err(e) => {
1217                tracing::warn!(
1218                    target: "enrich",
1219                    entity = %item.name,
1220                    error = %e,
1221                    "entity upsert skipped"
1222                );
1223            }
1224        }
1225    }
1226
1227    for rel in &extracted_rels {
1228        let normalized = crate::parsers::normalize_relation(&rel.relation);
1229        crate::parsers::warn_if_non_canonical(&normalized);
1230
1231        // Normalize entity names before lookup: upsert_entity normalizes on write,
1232        // so the lookup must use the same normalized form to find the row.
1233        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1234        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1235        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1236        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1237        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1238            let new_rel = NewRelationship {
1239                source: rel.source.clone(),
1240                target: rel.target.clone(),
1241                relation: normalized,
1242                strength: rel.strength,
1243                description: None,
1244            };
1245            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1246                rel_count += 1;
1247            }
1248        }
1249    }
1250
1251    Ok((ent_count, rel_count))
1252}
1253
1254/// Updates an entity's description directly in the `entities` table.
1255fn persist_entity_description(
1256    conn: &Connection,
1257    entity_id: i64,
1258    description: &str,
1259) -> Result<(), AppError> {
1260    conn.execute(
1261        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1262        rusqlite::params![description, entity_id],
1263    )?;
1264    Ok(())
1265}
1266
1267/// v1.0.84 (ADR-0042): on successful re-embed, records the active backend
1268/// into the shared accumulator (`ENRICH_LAST_BACKEND`) so the final
1269/// `EnrichSummary` can expose `backend_invoked` without changing every
1270/// caller's signature. Best-effort observability — concurrent enrich runs
1271/// may race, but `Mutex` keeps the mutation safe.
1272#[allow(clippy::too_many_arguments)]
1273fn reembed_memory_vector(
1274    conn: &Connection,
1275    namespace: &str,
1276    memory_id: i64,
1277    memory_name: &str,
1278    memory_type: &str,
1279    body: &str,
1280    paths: &crate::paths::AppPaths,
1281    llm_backend: crate::cli::LlmBackendChoice,
1282) -> Result<(), AppError> {
1283    let snippet: String = body.chars().take(200).collect();
1284    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
1285    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
1286    // backend que efetivamente rodou e popula o accumulator para o
1287    // EnrichSummary agregado.
1288    let (embedding, backend_kind) =
1289        crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1290    record_enrich_backend(backend_kind.as_str());
1291    memories::upsert_vec(
1292        conn,
1293        memory_id,
1294        namespace,
1295        memory_type,
1296        &embedding,
1297        memory_name,
1298        &snippet,
1299    )?;
1300    Ok(())
1301}
1302
1303/// v1.0.84 (ADR-0042): process-local accumulator of the last LLM backend
1304/// that successfully ran a re-embed during the current enrich invocation.
1305/// Read by `run` once at summary emission. Scoped to a single process —
1306/// cross-process enrichment is gated by the per-namespace singleton, so
1307/// there is no concurrency hazard across DBs.
1308fn record_enrich_backend(backend: &'static str) {
1309    if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1310        *guard = Some(backend);
1311    }
1312}
1313
1314fn take_enrich_backend() -> Option<&'static str> {
1315    ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1316}
1317
1318static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1319
1320/// Persists an enriched memory body (body-enrich, GAP-18).
1321///
1322/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1323/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1324fn persist_enriched_body(
1325    conn: &Connection,
1326    namespace: &str,
1327    memory_id: i64,
1328    memory_name: &str,
1329    new_body: &str,
1330    paths: &crate::paths::AppPaths,
1331    llm_backend: crate::cli::LlmBackendChoice,
1332) -> Result<(), AppError> {
1333    // Read current values for FTS sync
1334    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1335        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1336        rusqlite::params![memory_id],
1337        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1338    )?;
1339
1340    let memory_type: String = conn.query_row(
1341        "SELECT type FROM memories WHERE id=?1",
1342        rusqlite::params![memory_id],
1343        |r| r.get(0),
1344    )?;
1345
1346    let description: String = conn.query_row(
1347        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1348        rusqlite::params![memory_id],
1349        |r| r.get(0),
1350    )?;
1351
1352    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1353
1354    let new_memory = memories::NewMemory {
1355        namespace: namespace.to_string(),
1356        name: memory_name.to_string(),
1357        memory_type: memory_type.clone(),
1358        description: description.clone(),
1359        body: new_body.to_string(),
1360        body_hash,
1361        session_id: None,
1362        source: "agent".to_string(),
1363        metadata: serde_json::json!({
1364            "operation": "body-enrich",
1365            "orig_chars": old_body.chars().count(),
1366            "new_chars": new_body.chars().count(),
1367        }),
1368    };
1369
1370    // G29 audit: insert a new immutable version BEFORE the update so the
1371    // enriched body is reachable through `history --name <X>` and
1372    // `restore --version N` can roll back to the pre-enrich state.
1373    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1374    let version_metadata = serde_json::json!({
1375        "operation": "body-enrich",
1376        "orig_chars": old_body.chars().count(),
1377        "new_chars": new_body.chars().count(),
1378    })
1379    .to_string();
1380    crate::storage::versions::insert_version(
1381        conn,
1382        memory_id,
1383        next_version,
1384        memory_name,
1385        &memory_type,
1386        &description,
1387        new_body,
1388        &version_metadata,
1389        Some("enrich"),
1390        "edit",
1391    )?;
1392
1393    memories::update(conn, memory_id, &new_memory, None)?;
1394    memories::sync_fts_after_update(
1395        conn,
1396        memory_id,
1397        &old_name,
1398        &old_desc,
1399        &old_body,
1400        &new_memory.name,
1401        &new_memory.description,
1402        &new_memory.body,
1403    )?;
1404
1405    // Re-embed for recall accuracy
1406    if let Err(e) = reembed_memory_vector(
1407        conn,
1408        namespace,
1409        memory_id,
1410        memory_name,
1411        &memory_type,
1412        new_body,
1413        paths,
1414        llm_backend,
1415    ) {
1416        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1417    }
1418
1419    Ok(())
1420}
1421
1422// ---------------------------------------------------------------------------
1423// Main entry point
1424// ---------------------------------------------------------------------------
1425
1426// ---------------------------------------------------------------------------
1427// G20: mode-conditional flag validation
1428// ---------------------------------------------------------------------------
1429
1430/// True when a scalar value matches its declared default. Used to
1431/// distinguish "operator passed an explicit override" from "clap filled
1432/// the default" for flags with default_value_t.
1433fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1434    value == default
1435}
1436
1437/// G20: validate that flags for one LLM provider were not passed when
1438/// the operator selected a different provider. Flags silently discarded
1439/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1440/// DB work, so the operator gets an actionable error instead of a
1441/// surprise at runtime.
1442///
1443/// Detection rules:
1444/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1445/// - For scalar fields with default_value_t: value != default means explicit
1446/// - For boolean fields: true means explicit (default is false)
1447///
1448/// Mode-specific matrices:
1449/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1450/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1451fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1452    const DEFAULT_TIMEOUT: u64 = 300;
1453
1454    let mut conflicts: Vec<String> = Vec::new();
1455
1456    match args.mode {
1457        EnrichMode::ClaudeCode => {
1458            if args.codex_binary.is_some() {
1459                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1460            }
1461            if args.codex_model.is_some() {
1462                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1463            }
1464            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1465                conflicts.push(format!(
1466                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1467                    args.codex_timeout
1468                ));
1469            }
1470        }
1471        EnrichMode::Codex => {
1472            if args.claude_binary.is_some() {
1473                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1474            }
1475            if args.claude_model.is_some() {
1476                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1477            }
1478            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1479                conflicts.push(format!(
1480                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1481                    args.claude_timeout
1482                ));
1483            }
1484            if args.max_cost_usd.is_some() {
1485                conflicts.push(
1486                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1487                        .to_string(),
1488                );
1489            }
1490        }
1491    }
1492
1493    if !conflicts.is_empty() {
1494        return Err(AppError::Validation(format!(
1495            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1496            args.mode,
1497            conflicts.join("\n  - ")
1498        )));
1499    }
1500
1501    Ok(())
1502}
1503
1504// ---------------------------------------------------------------------------
1505
1506/// Main entry point for the `enrich` command.
1507pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1508    // G20: mode-conditional flag validation BEFORE any DB access.
1509    // Surfaces flags that the wrong mode would silently discard.
1510    validate_mode_conditional_flags_enrich(args)?;
1511    let started = Instant::now();
1512
1513    let paths = AppPaths::resolve(args.db.as_deref())?;
1514    ensure_db_ready(&paths)?;
1515    let conn = open_rw(&paths.db)?;
1516    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1517
1518    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1519    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1520    // on the same DB cannot co-exist, but concurrent enrich on different
1521    // databases works as expected. The force flag (--force) breaks a
1522    // stale lock from a previously crashed invocation.
1523    let wait_secs = args.wait_job_singleton;
1524    let force_flag = args.force_job_singleton;
1525    let _singleton = crate::lock::acquire_job_singleton(
1526        crate::lock::JobType::Enrich,
1527        &namespace,
1528        &paths.db,
1529        wait_secs,
1530        force_flag,
1531    )?;
1532
1533    // Validate provider binary upfront only for LLM-backed operations.
1534    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1535        None
1536    } else {
1537        Some(match args.mode {
1538            EnrichMode::ClaudeCode => {
1539                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1540                let version = super::claude_runner::validate_claude_version(&bin)?;
1541                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1542                emit_json(&PhaseEvent {
1543                    phase: "validate",
1544                    binary_path: bin.to_str(),
1545                    version: Some(&version),
1546                    items_total: None,
1547                    items_pending: None,
1548                    llm_parallelism: None,
1549                });
1550                bin
1551            }
1552            EnrichMode::Codex => {
1553                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1554                emit_json(&PhaseEvent {
1555                    phase: "validate",
1556                    binary_path: bin.to_str(),
1557                    version: None,
1558                    items_total: None,
1559                    items_pending: None,
1560                    llm_parallelism: None,
1561                });
1562                bin
1563            }
1564        })
1565    };
1566
1567    // G28-D: refuse to start when the system is saturated. This check
1568    // is BEFORE preflight so we never spend an OAuth turn on a host
1569    // that is already at the limit.
1570    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1571        let load = crate::system_load::load_average_one();
1572        let n = crate::system_load::ncpus();
1573        return Err(AppError::Validation(format!(
1574            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1575             pass --no-max-load-check to override (not recommended)"
1576        )));
1577    }
1578
1579    // G35: preflight probe — issue a single ping turn to verify the
1580    // provider is healthy before scanning N candidates. If the probe
1581    // fails with a rate-limit error, optionally fall back to a
1582    // different mode (typically codex) instead of failing the entire
1583    // batch. The probe itself consumes 1 OAuth turn, so it stays
1584    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1585    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1586    {
1587        let preflight_result = run_preflight_probe(args);
1588        match preflight_result {
1589            PreflightOutcome::Healthy => {
1590                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1591            }
1592            PreflightOutcome::RateLimited { reason, suggestion } => {
1593                if let Some(fallback) = args.fallback_mode.clone() {
1594                    if fallback != args.mode {
1595                        // G35 (v1.0.69): the mid-batch mode switch is
1596                        // intentionally NOT applied because it would
1597                        // desynchronise the per-item rate-limit wait
1598                        // state (rate-limited items in the worker are
1599                        // timed against the original provider). Instead
1600                        // we abort cleanly so the operator can re-invoke
1601                        // with `--mode {fallback:?}`. This guarantees no
1602                        // OAuth window is wasted and no partial state
1603                        // is left in the queue.
1604                        return Err(AppError::Validation(format!(
1605                            "preflight detected rate limit on {mode:?}: {reason}; \
1606                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1607                            mode = args.mode
1608                        )));
1609                    }
1610                    return Err(AppError::Validation(format!(
1611                        "preflight detected rate limit on {mode:?}: {reason}; \
1612                         --fallback-mode matches --mode, no recovery possible",
1613                        mode = args.mode
1614                    )));
1615                }
1616                return Err(AppError::Validation(format!(
1617                    "preflight detected rate limit on {mode:?}: {reason}; \
1618                     {suggestion}; pass --fallback-mode codex to recover",
1619                    mode = args.mode
1620                )));
1621            }
1622            PreflightOutcome::Error(e) => {
1623                return Err(e);
1624            }
1625        }
1626    }
1627
1628    // SCAN phase
1629    let scan_result = scan_operation(&conn, &namespace, args)?;
1630    let total = scan_result.len();
1631
1632    emit_json(&PhaseEvent {
1633        phase: "scan",
1634        binary_path: None,
1635        version: None,
1636        items_total: Some(total),
1637        items_pending: Some(total),
1638        llm_parallelism: Some(args.llm_parallelism),
1639    });
1640
1641    // Dry-run: emit preview events and summary without calling LLM
1642    if args.dry_run {
1643        for (idx, key) in scan_result.iter().enumerate() {
1644            emit_json(&ItemEvent {
1645                item: key,
1646                status: "preview",
1647                memory_id: None,
1648                entity_id: None,
1649                entities: None,
1650                rels: None,
1651                chars_before: None,
1652                chars_after: None,
1653                cost_usd: None,
1654                elapsed_ms: None,
1655                error: None,
1656                index: idx,
1657                total,
1658            });
1659        }
1660        emit_json(&EnrichSummary {
1661            summary: true,
1662            operation: format!("{:?}", args.operation),
1663            items_total: total,
1664            completed: 0,
1665            failed: 0,
1666            skipped: 0,
1667            cost_usd: 0.0,
1668            elapsed_ms: started.elapsed().as_millis() as u64,
1669            backend_invoked: take_enrich_backend(),
1670        });
1671        return Ok(());
1672    }
1673
1674    // All operations in this enum have an execution path.
1675
1676    // Queue setup for resume/retry
1677    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1678
1679    if args.resume {
1680        let reset = queue_conn
1681            .execute(
1682                "UPDATE queue SET status='pending' WHERE status='processing'",
1683                [],
1684            )
1685            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1686        if reset > 0 {
1687            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1688        }
1689    }
1690
1691    if args.retry_failed {
1692        let count = queue_conn
1693            .execute(
1694                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1695                [],
1696            )
1697            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1698        tracing::info!(target: "enrich", count, "retrying failed items");
1699    }
1700
1701    if !args.resume && !args.retry_failed {
1702        queue_conn
1703            .execute("DELETE FROM queue", [])
1704            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1705    }
1706
1707    // Populate queue
1708    for (idx, key) in scan_result.iter().enumerate() {
1709        let item_type = match args.operation {
1710            EnrichOperation::EntityDescriptions => "entity",
1711            _ => "memory",
1712        };
1713        if let Err(e) = queue_conn.execute(
1714            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1715            rusqlite::params![key, item_type],
1716        ) {
1717            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1718        }
1719        let _ = idx; // suppress unused warning
1720    }
1721
1722    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1723    // Clamp enforces the range even if the caller bypasses clap validation.
1724    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1725    if parallelism > 1 {
1726        tracing::info!(
1727            target: "enrich",
1728            llm_parallelism = parallelism,
1729            "parallel LLM processing with bounded thread pool"
1730        );
1731    }
1732    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1733    // ceiling. The threshold and message depend on the LLM mode because
1734    // Claude Code spawns MCP children (G28-A) while Codex does not.
1735    if parallelism > 4 {
1736        match args.mode {
1737            EnrichMode::ClaudeCode => {
1738                tracing::warn!(
1739                    target: "enrich",
1740                    llm_parallelism = parallelism,
1741                    recommended_max = 4,
1742                    mode = "claude-code",
1743                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1744                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1745                     to cut MCP children (G28-A)"
1746                );
1747            }
1748            EnrichMode::Codex if parallelism > 16 => {
1749                tracing::warn!(
1750                    target: "enrich",
1751                    llm_parallelism = parallelism,
1752                    recommended_max = 16,
1753                    mode = "codex",
1754                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1755                     consider --llm-parallelism 8 for safer concurrency"
1756                );
1757            }
1758            EnrichMode::Codex => {
1759                // No warning: codex does not spawn MCP children and was
1760                // validated at parallelism 8 in production (1161 items,
1761                // 0 failures) per the 2026-06-04 session audit.
1762            }
1763        }
1764    }
1765
1766    let mut completed = 0usize;
1767    let mut failed = 0usize;
1768    let mut skipped = 0usize;
1769    let mut cost_total = 0.0f64;
1770    let mut oauth_detected = false;
1771    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1772    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1773    let enrich_started = std::time::Instant::now();
1774
1775    let provider_timeout = match args.mode {
1776        EnrichMode::ClaudeCode => args.claude_timeout,
1777        EnrichMode::Codex => args.codex_timeout,
1778    };
1779
1780    let provider_model: Option<&str> = match args.mode {
1781        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1782        EnrichMode::Codex => args.codex_model.as_deref(),
1783    };
1784
1785    // G19: when parallelism > 1, spawn bounded worker threads.
1786    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1787    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1788    if parallelism > 1 {
1789        let stdout_mu = parking_lot::Mutex::new(());
1790        let budget = args.max_cost_usd;
1791        let operation = args.operation.clone();
1792        let mode = args.mode.clone();
1793        let min_oc = args.min_output_chars;
1794        let max_oc = args.max_output_chars;
1795        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1796
1797        struct WorkerResult {
1798            completed: usize,
1799            failed: usize,
1800            skipped: usize,
1801            cost: f64,
1802            oauth: bool,
1803        }
1804
1805        let results: Vec<WorkerResult> = std::thread::scope(|s| {
1806            let handles: Vec<_> = (0..parallelism)
1807                .map(|worker_id| {
1808                    let stdout_mu = &stdout_mu;
1809                    let paths = &paths;
1810                    let namespace = &namespace;
1811                    let provider_binary = provider_binary.as_deref();
1812                    let operation = &operation;
1813                    let mode = &mode;
1814                    let prompt_tpl = prompt_tpl.as_deref();
1815                    s.spawn(move || {
1816                        let w_conn = match open_rw(&paths.db) {
1817                            Ok(c) => c,
1818                            Err(e) => {
1819                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1820                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1821                            }
1822                        };
1823                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1824                            Ok(c) => c,
1825                            Err(e) => {
1826                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1827                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1828                            }
1829                        };
1830                        let mut w_completed = 0usize;
1831                        let mut w_failed = 0usize;
1832                        let mut w_skipped = 0usize;
1833                        let mut w_cost = 0.0f64;
1834                        let mut w_oauth = false;
1835                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1836                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1837                        // G28-D: per-worker circuit breaker that aborts the
1838                        // loop after `circuit_breaker_threshold` consecutive
1839                        // HardFailure outcomes (transient/rate-limited errors
1840                        // do NOT count, so a recovering provider is not
1841                        // penalised).
1842                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1843                            args.circuit_breaker_threshold.max(1),
1844                            std::time::Duration::from_secs(60),
1845                        );
1846
1847                        loop {
1848                            if crate::shutdown_requested() {
1849                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1850                                break;
1851                            }
1852                            if let Some(b) = budget {
1853                                if !w_oauth && w_cost >= b {
1854                                    break;
1855                                }
1856                            }
1857                            let pending: Option<(i64, String, String)> = w_queue
1858                                .query_row(
1859                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1860                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1861                                     RETURNING id, item_key, item_type",
1862                                    [],
1863                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1864                                )
1865                                .ok();
1866                            let (queue_id, item_key, _item_type) = match pending {
1867                                Some(p) => p,
1868                                None => break,
1869                            };
1870                            let item_started = Instant::now();
1871                            let current_index = w_completed + w_failed + w_skipped;
1872
1873                            let call_result = match operation {
1874                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1875                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1876                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
1877                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
1878                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1879                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1880                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1881                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1882                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1883                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1884                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1885                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1886                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1887                            };
1888
1889                            match call_result {
1890                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1891                                    if is_oauth { w_oauth = true; }
1892                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1893                                    let _ = w_queue.execute(
1894                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1895                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1896                                    );
1897                                    w_completed += 1;
1898                                    if !is_oauth { w_cost += cost; }
1899                                    // G28-D: count success; resets breaker.
1900                                    let _ = w_breaker
1901                                        .record(crate::retry::AttemptOutcome::Success);
1902                                    let _guard = stdout_mu.lock();
1903                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1904                                }
1905                                Ok(EnrichItemResult::Skipped { reason }) => {
1906                                    w_skipped += 1;
1907                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1908                                    let _guard = stdout_mu.lock();
1909                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1910                                }
1911                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1912                                    // G29 Passo 4: worker mirror of the
1913                                    // serial path. Counted as a soft
1914                                    // skip so the queue surface shows
1915                                    // a quality issue rather than a
1916                                    // transport failure.
1917                                    w_skipped += 1;
1918                                    let reason = format!(
1919                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1920                                    );
1921                                    let _ = w_queue.execute(
1922                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1923                                        rusqlite::params![reason, queue_id],
1924                                    );
1925                                    let _guard = stdout_mu.lock();
1926                                    emit_json(&ItemEvent {
1927                                        item: &item_key,
1928                                        status: "preservation_failed",
1929                                        memory_id: None,
1930                                        entity_id: None,
1931                                        entities: None,
1932                                        rels: None,
1933                                        chars_before: Some(chars_before),
1934                                        chars_after: Some(chars_after),
1935                                        cost_usd: None,
1936                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1937                                        error: Some(reason),
1938                                        index: current_index,
1939                                        total,
1940                                    });
1941                                }
1942                                Err(e) => {
1943                                    let err_str = format!("{e}");
1944                                    if matches!(e, AppError::RateLimited { .. }) {
1945                                        if crate::retry::is_kill_switch_active() {
1946                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1947                                        } else if std::time::Instant::now() >= w_deadline {
1948                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1949                                        } else {
1950                                            let half = w_backoff / 2;
1951                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1952                                            let actual_wait = half + jitter;
1953                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1954                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1955                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1956                                            w_backoff = (w_backoff * 2).min(900);
1957                                            continue;
1958                                        }
1959                                    }
1960                                    w_failed += 1;
1961                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1962                                    let _guard = stdout_mu.lock();
1963                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1964                                    // G28-D: count hard failure against breaker.
1965                                    let breaker_opened = w_breaker
1966                                        .record(crate::retry::AttemptOutcome::HardFailure);
1967                                    if breaker_opened {
1968                                        tracing::error!(target: "enrich",
1969                                            consecutive_failures = w_breaker.consecutive_failures(),
1970                                            "circuit breaker opened — aborting worker"
1971                                        );
1972                                        break;
1973                                    }
1974                                }
1975                            }
1976                        }
1977                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1978                    })
1979                })
1980                .collect();
1981            handles
1982                .into_iter()
1983                .map(|h| {
1984                    h.join().unwrap_or(WorkerResult {
1985                        completed: 0,
1986                        failed: 0,
1987                        skipped: 0,
1988                        cost: 0.0,
1989                        oauth: false,
1990                    })
1991                })
1992                .collect()
1993        });
1994
1995        for r in &results {
1996            completed += r.completed;
1997            failed += r.failed;
1998            skipped += r.skipped;
1999            cost_total += r.cost;
2000            if r.oauth && !oauth_detected {
2001                oauth_detected = true;
2002            }
2003        }
2004    } else {
2005        // Serial path (parallelism == 1) — original loop
2006        loop {
2007            if crate::shutdown_requested() {
2008                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2009                break;
2010            }
2011
2012            // Budget check
2013            if let Some(budget) = args.max_cost_usd {
2014                if !oauth_detected && cost_total >= budget {
2015                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2016                    break;
2017                }
2018            }
2019
2020            // Dequeue next pending item
2021            let pending: Option<(i64, String, String)> = queue_conn
2022                .query_row(
2023                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2024                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2025                 RETURNING id, item_key, item_type",
2026                    [],
2027                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2028                )
2029                .ok();
2030
2031            let (queue_id, item_key, item_type) = match pending {
2032                Some(p) => p,
2033                None => break,
2034            };
2035
2036            let item_started = Instant::now();
2037            let current_index = completed + failed + skipped;
2038
2039            let call_result = match args.operation {
2040                EnrichOperation::MemoryBindings => call_memory_bindings(
2041                    &conn,
2042                    &namespace,
2043                    &item_key,
2044                    provider_binary
2045                        .as_deref()
2046                        .expect("provider binary required"),
2047                    provider_model,
2048                    provider_timeout,
2049                    &args.mode,
2050                ),
2051                EnrichOperation::EntityDescriptions => call_entity_description(
2052                    &conn,
2053                    &namespace,
2054                    &item_key,
2055                    provider_binary
2056                        .as_deref()
2057                        .expect("provider binary required"),
2058                    provider_model,
2059                    provider_timeout,
2060                    &args.mode,
2061                ),
2062                EnrichOperation::BodyEnrich => call_body_enrich(
2063                    &conn,
2064                    &namespace,
2065                    &item_key,
2066                    provider_binary
2067                        .as_deref()
2068                        .expect("provider binary required"),
2069                    provider_model,
2070                    provider_timeout,
2071                    &args.mode,
2072                    args.min_output_chars,
2073                    args.max_output_chars,
2074                    args.prompt_template.as_deref(),
2075                    args.preserve_threshold,
2076                    &paths,
2077                    llm_backend,
2078                ),
2079                EnrichOperation::ReEmbed => {
2080                    call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2081                }
2082                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2083                    &conn,
2084                    &namespace,
2085                    &item_key,
2086                    provider_binary
2087                        .as_deref()
2088                        .expect("provider binary required"),
2089                    provider_model,
2090                    provider_timeout,
2091                    &args.mode,
2092                ),
2093                EnrichOperation::RelationReclassify => call_relation_reclassify(
2094                    &conn,
2095                    &namespace,
2096                    &item_key,
2097                    provider_binary
2098                        .as_deref()
2099                        .expect("provider binary required"),
2100                    provider_model,
2101                    provider_timeout,
2102                    &args.mode,
2103                ),
2104                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2105                    call_entity_connect(
2106                        &conn,
2107                        &namespace,
2108                        &item_key,
2109                        provider_binary
2110                            .as_deref()
2111                            .expect("provider binary required"),
2112                        provider_model,
2113                        provider_timeout,
2114                        &args.mode,
2115                    )
2116                }
2117                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2118                    &conn,
2119                    &namespace,
2120                    &item_key,
2121                    provider_binary
2122                        .as_deref()
2123                        .expect("provider binary required"),
2124                    provider_model,
2125                    provider_timeout,
2126                    &args.mode,
2127                ),
2128                EnrichOperation::DescriptionEnrich => call_description_enrich(
2129                    &conn,
2130                    &namespace,
2131                    &item_key,
2132                    provider_binary
2133                        .as_deref()
2134                        .expect("provider binary required"),
2135                    provider_model,
2136                    provider_timeout,
2137                    &args.mode,
2138                ),
2139                EnrichOperation::DomainClassify => call_domain_classify(
2140                    &conn,
2141                    &namespace,
2142                    &item_key,
2143                    provider_binary
2144                        .as_deref()
2145                        .expect("provider binary required"),
2146                    provider_model,
2147                    provider_timeout,
2148                    &args.mode,
2149                ),
2150                EnrichOperation::GraphAudit => call_graph_audit(
2151                    &conn,
2152                    &namespace,
2153                    &item_key,
2154                    provider_binary
2155                        .as_deref()
2156                        .expect("provider binary required"),
2157                    provider_model,
2158                    provider_timeout,
2159                    &args.mode,
2160                ),
2161                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2162                    &conn,
2163                    &namespace,
2164                    &item_key,
2165                    provider_binary
2166                        .as_deref()
2167                        .expect("provider binary required"),
2168                    provider_model,
2169                    provider_timeout,
2170                    &args.mode,
2171                ),
2172                EnrichOperation::BodyExtract => call_body_extract(
2173                    &conn,
2174                    &namespace,
2175                    &item_key,
2176                    provider_binary
2177                        .as_deref()
2178                        .expect("provider binary required"),
2179                    provider_model,
2180                    provider_timeout,
2181                    &args.mode,
2182                ),
2183            };
2184
2185            match call_result {
2186                Ok(EnrichItemResult::Done {
2187                    memory_id,
2188                    entity_id,
2189                    entities,
2190                    rels,
2191                    chars_before,
2192                    chars_after,
2193                    cost,
2194                    is_oauth,
2195                }) => {
2196                    if is_oauth && !oauth_detected {
2197                        oauth_detected = true;
2198                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2199                    }
2200                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2201
2202                    // Persist depends on the operation
2203                    let persist_err: Option<String> = match args.operation {
2204                        EnrichOperation::MemoryBindings => {
2205                            // Bindings already persisted inside call_memory_bindings
2206                            None
2207                        }
2208                        EnrichOperation::EntityDescriptions => {
2209                            // Description already persisted inside call_entity_description
2210                            None
2211                        }
2212                        EnrichOperation::BodyEnrich => {
2213                            // Body already persisted inside call_body_enrich
2214                            None
2215                        }
2216                        _ => {
2217                            // All G27 operations persist inside their call_* function
2218                            None
2219                        }
2220                    };
2221
2222                    if let Err(e) = queue_conn.execute(
2223                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2224                    rusqlite::params![
2225                        memory_id,
2226                        entity_id,
2227                        entities as i64,
2228                        rels as i64,
2229                        cost,
2230                        item_started.elapsed().as_millis() as i64,
2231                        queue_id
2232                    ],
2233                ) {
2234                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2235                    }
2236
2237                    if persist_err.is_none() {
2238                        completed += 1;
2239                        if !is_oauth {
2240                            cost_total += cost;
2241                        }
2242                        emit_json(&ItemEvent {
2243                            item: &item_key,
2244                            status: "done",
2245                            memory_id,
2246                            entity_id,
2247                            entities: Some(entities),
2248                            rels: Some(rels),
2249                            chars_before,
2250                            chars_after,
2251                            cost_usd: if is_oauth { None } else { Some(cost) },
2252                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2253                            error: None,
2254                            index: current_index,
2255                            total,
2256                        });
2257                    } else {
2258                        failed += 1;
2259                        emit_json(&ItemEvent {
2260                            item: &item_key,
2261                            status: "failed",
2262                            memory_id: None,
2263                            entity_id: None,
2264                            entities: None,
2265                            rels: None,
2266                            chars_before: None,
2267                            chars_after: None,
2268                            cost_usd: None,
2269                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2270                            error: persist_err,
2271                            index: current_index,
2272                            total,
2273                        });
2274                    }
2275                }
2276                Ok(EnrichItemResult::Skipped { reason }) => {
2277                    skipped += 1;
2278                    if let Err(e) = queue_conn.execute(
2279                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2280                    rusqlite::params![reason, queue_id],
2281                ) {
2282                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2283                    }
2284                    emit_json(&ItemEvent {
2285                        item: &item_key,
2286                        status: "skipped",
2287                        memory_id: None,
2288                        entity_id: None,
2289                        entities: None,
2290                        rels: None,
2291                        chars_before: None,
2292                        chars_after: None,
2293                        cost_usd: None,
2294                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2295                        error: None,
2296                        index: current_index,
2297                        total,
2298                    });
2299                }
2300                Ok(EnrichItemResult::PreservationFailed {
2301                    score,
2302                    threshold,
2303                    chars_before,
2304                    chars_after,
2305                }) => {
2306                    // G29 Passo 4: the LLM rewrite diverged too far from
2307                    // the original body. Count as a soft failure (not
2308                    // `failed`) so the queue surfaces it as a quality
2309                    // issue, not a transport error. The reason is
2310                    // structured so the operator can audit why a body
2311                    // was rejected.
2312                    skipped += 1;
2313                    let reason = format!(
2314                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2315                    );
2316                    if let Err(qe) = queue_conn.execute(
2317                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2318                        rusqlite::params![reason, queue_id],
2319                    ) {
2320                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2321                    }
2322                    emit_json(&ItemEvent {
2323                        item: &item_key,
2324                        status: "preservation_failed",
2325                        memory_id: None,
2326                        entity_id: None,
2327                        entities: None,
2328                        rels: None,
2329                        chars_before: Some(chars_before),
2330                        chars_after: Some(chars_after),
2331                        cost_usd: None,
2332                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2333                        error: Some(reason),
2334                        index: current_index,
2335                        total,
2336                    });
2337                }
2338                Err(e) => {
2339                    let err_str = format!("{e}");
2340                    if matches!(e, AppError::RateLimited { .. }) {
2341                        if crate::retry::is_kill_switch_active() {
2342                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2343                        } else if std::time::Instant::now() >= rate_limit_deadline {
2344                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2345                        } else {
2346                            let half = backoff_secs / 2;
2347                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2348                            let actual_wait = half + jitter;
2349                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2350                            if let Err(qe) = queue_conn.execute(
2351                                "UPDATE queue SET status='pending' WHERE id=?1",
2352                                rusqlite::params![queue_id],
2353                            ) {
2354                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2355                            }
2356                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2357                            backoff_secs = (backoff_secs * 2).min(900);
2358                            continue;
2359                        }
2360                    }
2361
2362                    failed += 1;
2363                    if let Err(qe) = queue_conn.execute(
2364                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2365                    rusqlite::params![err_str, queue_id],
2366                ) {
2367                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2368                    }
2369                    emit_json(&ItemEvent {
2370                        item: &item_key,
2371                        status: "failed",
2372                        memory_id: None,
2373                        entity_id: None,
2374                        entities: None,
2375                        rels: None,
2376                        chars_before: None,
2377                        chars_after: None,
2378                        cost_usd: None,
2379                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2380                        error: Some(err_str),
2381                        index: current_index,
2382                        total,
2383                    });
2384                }
2385            }
2386
2387            let _ = item_type; // used via queue schema only
2388        }
2389    } // end else (serial path)
2390
2391    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2392    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2393
2394    emit_json(&EnrichSummary {
2395        summary: true,
2396        operation: format!("{:?}", args.operation),
2397        items_total: total,
2398        completed,
2399        failed,
2400        skipped,
2401        cost_usd: cost_total,
2402        elapsed_ms: started.elapsed().as_millis() as u64,
2403        backend_invoked: take_enrich_backend(),
2404    });
2405
2406    if failed == 0 {
2407        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2408    }
2409
2410    Ok(())
2411}
2412
2413// ---------------------------------------------------------------------------
2414// Internal result type for a single item call
2415// ---------------------------------------------------------------------------
2416
2417enum EnrichItemResult {
2418    Done {
2419        memory_id: Option<i64>,
2420        entity_id: Option<i64>,
2421        entities: usize,
2422        rels: usize,
2423        chars_before: Option<usize>,
2424        chars_after: Option<usize>,
2425        cost: f64,
2426        is_oauth: bool,
2427    },
2428    Skipped {
2429        reason: String,
2430    },
2431    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2432    /// body beyond the configured `--preserve-threshold` and was rejected
2433    /// before persistence. The trigram-Jaccard score and threshold are
2434    /// emitted in the NDJSON stream for operator audit.
2435    PreservationFailed {
2436        score: f64,
2437        threshold: f64,
2438        chars_before: usize,
2439        chars_after: usize,
2440    },
2441}
2442
2443// ---------------------------------------------------------------------------
2444// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2445// ---------------------------------------------------------------------------
2446
2447fn call_memory_bindings(
2448    conn: &Connection,
2449    namespace: &str,
2450    memory_name: &str,
2451    binary: &Path,
2452    model: Option<&str>,
2453    timeout: u64,
2454    mode: &EnrichMode,
2455) -> Result<EnrichItemResult, AppError> {
2456    // Look up the memory
2457    let (memory_id, body): (i64, String) = conn.query_row(
2458        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2459        rusqlite::params![namespace, memory_name],
2460        |r| Ok((r.get(0)?, r.get(1)?)),
2461    ).map_err(|e| match e {
2462        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2463        other => AppError::Database(other),
2464    })?;
2465
2466    if body.trim().is_empty() {
2467        return Ok(EnrichItemResult::Skipped {
2468            reason: "body is empty".to_string(),
2469        });
2470    }
2471
2472    let (value, cost, is_oauth) = match mode {
2473        EnrichMode::ClaudeCode => call_claude(
2474            binary,
2475            BINDINGS_PROMPT,
2476            BINDINGS_SCHEMA,
2477            &body,
2478            model,
2479            timeout,
2480        )?,
2481        EnrichMode::Codex => call_codex(
2482            binary,
2483            BINDINGS_PROMPT,
2484            BINDINGS_SCHEMA,
2485            &body,
2486            model,
2487            timeout,
2488        )?,
2489    };
2490
2491    let empty_arr = serde_json::Value::Array(vec![]);
2492    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2493    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2494
2495    let (ent_count, rel_count) =
2496        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2497
2498    Ok(EnrichItemResult::Done {
2499        memory_id: Some(memory_id),
2500        entity_id: None,
2501        entities: ent_count,
2502        rels: rel_count,
2503        chars_before: None,
2504        chars_after: None,
2505        cost,
2506        is_oauth,
2507    })
2508}
2509
2510fn call_entity_description(
2511    conn: &Connection,
2512    namespace: &str,
2513    entity_name: &str,
2514    binary: &Path,
2515    model: Option<&str>,
2516    timeout: u64,
2517    mode: &EnrichMode,
2518) -> Result<EnrichItemResult, AppError> {
2519    let (entity_id, entity_type): (i64, String) = conn
2520        .query_row(
2521            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2522            rusqlite::params![namespace, entity_name],
2523            |r| Ok((r.get(0)?, r.get(1)?)),
2524        )
2525        .map_err(|e| match e {
2526            rusqlite::Error::QueryReturnedNoRows => {
2527                AppError::NotFound(format!("entity '{entity_name}' not found"))
2528            }
2529            other => AppError::Database(other),
2530        })?;
2531
2532    let prompt = format!(
2533        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2534    );
2535
2536    let (value, cost, is_oauth) = match mode {
2537        EnrichMode::ClaudeCode => call_claude(
2538            binary,
2539            &prompt,
2540            ENTITY_DESCRIPTION_SCHEMA,
2541            "",
2542            model,
2543            timeout,
2544        )?,
2545        EnrichMode::Codex => call_codex(
2546            binary,
2547            &prompt,
2548            ENTITY_DESCRIPTION_SCHEMA,
2549            "",
2550            model,
2551            timeout,
2552        )?,
2553    };
2554
2555    let description = value
2556        .get("description")
2557        .and_then(|v| v.as_str())
2558        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2559
2560    persist_entity_description(conn, entity_id, description)?;
2561
2562    Ok(EnrichItemResult::Done {
2563        memory_id: None,
2564        entity_id: Some(entity_id),
2565        entities: 0,
2566        rels: 0,
2567        chars_before: None,
2568        chars_after: None,
2569        cost,
2570        is_oauth,
2571    })
2572}
2573
2574#[allow(clippy::too_many_arguments)]
2575fn call_body_enrich(
2576    conn: &Connection,
2577    namespace: &str,
2578    memory_name: &str,
2579    binary: &Path,
2580    model: Option<&str>,
2581    timeout: u64,
2582    mode: &EnrichMode,
2583    min_output_chars: usize,
2584    max_output_chars: usize,
2585    prompt_template: Option<&Path>,
2586    preserve_threshold: f64,
2587    paths: &crate::paths::AppPaths,
2588    llm_backend: crate::cli::LlmBackendChoice,
2589) -> Result<EnrichItemResult, AppError> {
2590    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2591        .query_row(
2592            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2593         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2594            rusqlite::params![namespace, memory_name],
2595            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2596        )
2597        .map_err(|e| match e {
2598            rusqlite::Error::QueryReturnedNoRows => {
2599                AppError::NotFound(format!("memory '{memory_name}' not found"))
2600            }
2601            other => AppError::Database(other),
2602        })?;
2603
2604    let chars_before = body.chars().count();
2605
2606    // G26: gather graph context for contextualized enrichment
2607    let linked_entities: Vec<String> = {
2608        let mut stmt = conn.prepare_cached(
2609            "SELECT e.name FROM memory_entities me \
2610             JOIN entities e ON e.id = me.entity_id \
2611             WHERE me.memory_id = ?1 LIMIT 10",
2612        )?;
2613        let result: Vec<String> = stmt
2614            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2615            .filter_map(|r| r.ok())
2616            .collect();
2617        drop(stmt);
2618        result
2619    };
2620
2621    // Load custom prompt template if provided
2622    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2623        let file_size = std::fs::metadata(tmpl_path)
2624            .map_err(|e| {
2625                AppError::Io(std::io::Error::new(
2626                    e.kind(),
2627                    format!("failed to stat prompt template: {e}"),
2628                ))
2629            })?
2630            .len();
2631        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2632            return Err(AppError::LimitExceeded(
2633                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2634            ));
2635        }
2636        std::fs::read_to_string(tmpl_path).map_err(|e| {
2637            AppError::Io(std::io::Error::new(
2638                e.kind(),
2639                format!("failed to read prompt template: {e}"),
2640            ))
2641        })?
2642    } else {
2643        BODY_ENRICH_PROMPT_PREFIX.to_string()
2644    };
2645
2646    // G26: build contextualized prompt with graph data
2647    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2648        let mut ctx = String::new();
2649        ctx.push_str(&format!(
2650            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2651        ));
2652        if !description.is_empty() {
2653            ctx.push_str(&format!("- Description: {description}\n"));
2654        }
2655        ctx.push_str(&format!("- Domain: {namespace}\n"));
2656        if !linked_entities.is_empty() {
2657            ctx.push_str(&format!(
2658                "- Linked entities: {}\n",
2659                linked_entities.join(", ")
2660            ));
2661        }
2662        ctx
2663    } else {
2664        String::new()
2665    };
2666
2667    let prompt = format!(
2668        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2669    );
2670
2671    // The body schema uses a free-form enriched_body field
2672    let (value, cost, is_oauth) = match mode {
2673        EnrichMode::ClaudeCode => {
2674            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2675        }
2676        EnrichMode::Codex => {
2677            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2678        }
2679    };
2680
2681    let enriched_body = value
2682        .get("enriched_body")
2683        .and_then(|v| v.as_str())
2684        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2685
2686    let chars_after = enriched_body.chars().count();
2687
2688    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2689    // a trigram-Jaccard similarity between the original body and the
2690    // LLM-rewritten body. When the score falls below
2691    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2692    // rewrite as a likely hallucination. The result is recorded in the
2693    // NDJSON stream so operators can audit what the LLM tried to do.
2694    let threshold = preserve_threshold;
2695    let verdict =
2696        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2697    if !verdict.is_accepted() {
2698        return Ok(EnrichItemResult::PreservationFailed {
2699            score: match verdict {
2700                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2701                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2702                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2703            },
2704            threshold,
2705            chars_before,
2706            chars_after,
2707        });
2708    }
2709
2710    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2711    // compare the hash of the original body against the hash of the enriched
2712    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2713    // body (rare but possible) — treat as `Skipped` so re-running the batch
2714    // is safe and the queue does not get re-persisted entries.
2715    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2716    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2717    if old_hash == new_hash {
2718        return Ok(EnrichItemResult::Skipped {
2719            reason: format!(
2720                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2721            ),
2722        });
2723    }
2724
2725    // Only persist if the enriched body is genuinely longer
2726    if chars_after <= chars_before {
2727        return Ok(EnrichItemResult::Skipped {
2728            reason: format!(
2729                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2730            ),
2731        });
2732    }
2733
2734    persist_enriched_body(
2735        conn,
2736        namespace,
2737        memory_id,
2738        memory_name,
2739        enriched_body,
2740        paths,
2741        llm_backend,
2742    )?;
2743
2744    Ok(EnrichItemResult::Done {
2745        memory_id: Some(memory_id),
2746        entity_id: None,
2747        entities: 0,
2748        rels: 0,
2749        chars_before: Some(chars_before),
2750        chars_after: Some(chars_after),
2751        cost,
2752        is_oauth,
2753    })
2754}
2755
2756fn call_reembed(
2757    conn: &Connection,
2758    namespace: &str,
2759    memory_name: &str,
2760    paths: &crate::paths::AppPaths,
2761    llm_backend: crate::cli::LlmBackendChoice,
2762) -> Result<EnrichItemResult, AppError> {
2763    let (memory_id, body, memory_type): (i64, String, String) = conn
2764        .query_row(
2765            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2766             FROM memories
2767             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2768            rusqlite::params![namespace, memory_name],
2769            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2770        )
2771        .map_err(|e| match e {
2772            rusqlite::Error::QueryReturnedNoRows => {
2773                AppError::NotFound(format!("memory '{memory_name}' not found"))
2774            }
2775            other => AppError::Database(other),
2776        })?;
2777
2778    if body.trim().is_empty() {
2779        return Ok(EnrichItemResult::Skipped {
2780            reason: "body is empty".to_string(),
2781        });
2782    }
2783
2784    reembed_memory_vector(
2785        conn,
2786        namespace,
2787        memory_id,
2788        memory_name,
2789        &memory_type,
2790        &body,
2791        paths,
2792        llm_backend,
2793    )?;
2794
2795    Ok(EnrichItemResult::Done {
2796        memory_id: Some(memory_id),
2797        entity_id: None,
2798        entities: 0,
2799        rels: 0,
2800        chars_before: Some(body.chars().count()),
2801        chars_after: Some(body.chars().count()),
2802        cost: 0.0,
2803        is_oauth: true,
2804    })
2805}
2806
2807// ---------------------------------------------------------------------------
2808// Scan dispatcher — maps operation to scan query result (item keys)
2809// ---------------------------------------------------------------------------
2810
2811fn scan_operation(
2812    conn: &Connection,
2813    namespace: &str,
2814    args: &EnrichArgs,
2815) -> Result<Vec<String>, AppError> {
2816    // G37: resolve --names + --names-file once and apply to every scan path.
2817    let name_filter = resolve_name_filter(args)?;
2818    match args.operation {
2819        EnrichOperation::MemoryBindings => {
2820            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2821            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2822        }
2823        EnrichOperation::EntityDescriptions => {
2824            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2825            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2826        }
2827        EnrichOperation::BodyEnrich => {
2828            let rows =
2829                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2830            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2831        }
2832        EnrichOperation::ReEmbed => {
2833            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2834            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2835        }
2836        EnrichOperation::WeightCalibrate => {
2837            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2838            Ok(rows
2839                .into_iter()
2840                .map(|(id, _, _, _, _)| id.to_string())
2841                .collect())
2842        }
2843        EnrichOperation::RelationReclassify => {
2844            let rows = scan_generic_relations(conn, namespace, args.limit)?;
2845            Ok(rows
2846                .into_iter()
2847                .map(|(id, _, _, _)| id.to_string())
2848                .collect())
2849        }
2850        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2851            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2852            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2853        }
2854        EnrichOperation::EntityTypeValidate => {
2855            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2856            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2857        }
2858        EnrichOperation::DescriptionEnrich => {
2859            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2860            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2861        }
2862        EnrichOperation::DomainClassify
2863        | EnrichOperation::GraphAudit
2864        | EnrichOperation::DeepResearchSynth
2865        | EnrichOperation::BodyExtract => {
2866            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2867            let sql = format!(
2868                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2869            );
2870            let mut stmt = conn.prepare(&sql)?;
2871            let names = stmt
2872                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2873                .collect::<Result<Vec<_>, _>>()?;
2874            Ok(names)
2875        }
2876    }
2877}
2878
2879// ---------------------------------------------------------------------------
2880// Codex stub provider
2881// ---------------------------------------------------------------------------
2882
2883/// Locates the Codex CLI binary.
2884fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2885    if let Some(p) = explicit {
2886        if p.exists() {
2887            return Ok(p.to_path_buf());
2888        }
2889        return Err(AppError::Validation(format!(
2890            "Codex binary not found at explicit path: {}",
2891            p.display()
2892        )));
2893    }
2894
2895    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2896        let p = PathBuf::from(&env_path);
2897        if p.exists() {
2898            return Ok(p);
2899        }
2900    }
2901
2902    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2903    if let Some(path_var) = std::env::var_os("PATH") {
2904        for dir in std::env::split_paths(&path_var) {
2905            let candidate = dir.join(name);
2906            if candidate.exists() {
2907                return Ok(crate::extract::llm_embedding::resolve_real_binary(
2908                    &candidate,
2909                ));
2910            }
2911        }
2912    }
2913
2914    Err(AppError::Validation(
2915        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2916    ))
2917}
2918
2919/// G27: Calibrate weight of a single relationship via LLM.
2920fn call_weight_calibrate(
2921    conn: &Connection,
2922    _namespace: &str,
2923    item_key: &str,
2924    binary: &Path,
2925    model: Option<&str>,
2926    timeout: u64,
2927    mode: &EnrichMode,
2928) -> Result<EnrichItemResult, AppError> {
2929    let rel_id: i64 = item_key
2930        .parse()
2931        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2932    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2933        .query_row(
2934            "SELECT e1.name, e2.name, r.relation, r.weight \
2935             FROM relationships r \
2936             JOIN entities e1 ON e1.id = r.source_id \
2937             JOIN entities e2 ON e2.id = r.target_id \
2938             WHERE r.id = ?1",
2939            rusqlite::params![rel_id],
2940            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2941        )
2942        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2943
2944    let input_text = format!(
2945        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2946    );
2947    let (value, cost, is_oauth) = match mode {
2948        EnrichMode::ClaudeCode => call_claude(
2949            binary,
2950            WEIGHT_CALIBRATE_PROMPT,
2951            WEIGHT_CALIBRATE_SCHEMA,
2952            &input_text,
2953            model,
2954            timeout,
2955        )?,
2956        EnrichMode::Codex => call_codex(
2957            binary,
2958            WEIGHT_CALIBRATE_PROMPT,
2959            WEIGHT_CALIBRATE_SCHEMA,
2960            &input_text,
2961            model,
2962            timeout,
2963        )?,
2964    };
2965
2966    let calibrated = value
2967        .get("calibrated_weight")
2968        .and_then(|v| v.as_f64())
2969        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2970
2971    conn.execute(
2972        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2973        rusqlite::params![calibrated, rel_id],
2974    )?;
2975
2976    Ok(EnrichItemResult::Done {
2977        memory_id: None,
2978        entity_id: None,
2979        entities: 0,
2980        rels: 1,
2981        chars_before: None,
2982        chars_after: None,
2983        cost,
2984        is_oauth,
2985    })
2986}
2987
2988/// G27: Reclassify a generic relationship type via LLM.
2989fn call_relation_reclassify(
2990    conn: &Connection,
2991    _namespace: &str,
2992    item_key: &str,
2993    binary: &Path,
2994    model: Option<&str>,
2995    timeout: u64,
2996    mode: &EnrichMode,
2997) -> Result<EnrichItemResult, AppError> {
2998    let rel_id: i64 = item_key
2999        .parse()
3000        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3001    let (source_name, target_name, current_relation): (String, String, String) = conn
3002        .query_row(
3003            "SELECT e1.name, e2.name, r.relation \
3004             FROM relationships r \
3005             JOIN entities e1 ON e1.id = r.source_id \
3006             JOIN entities e2 ON e2.id = r.target_id \
3007             WHERE r.id = ?1",
3008            rusqlite::params![rel_id],
3009            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3010        )
3011        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3012
3013    let input_text = format!(
3014        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3015    );
3016    let (value, cost, is_oauth) = match mode {
3017        EnrichMode::ClaudeCode => call_claude(
3018            binary,
3019            RELATION_RECLASSIFY_PROMPT,
3020            RELATION_RECLASSIFY_SCHEMA,
3021            &input_text,
3022            model,
3023            timeout,
3024        )?,
3025        EnrichMode::Codex => call_codex(
3026            binary,
3027            RELATION_RECLASSIFY_PROMPT,
3028            RELATION_RECLASSIFY_SCHEMA,
3029            &input_text,
3030            model,
3031            timeout,
3032        )?,
3033    };
3034
3035    let new_relation = value
3036        .get("relation")
3037        .and_then(|v| v.as_str())
3038        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3039    let new_strength = value
3040        .get("strength")
3041        .and_then(|v| v.as_f64())
3042        .unwrap_or(0.5);
3043
3044    conn.execute(
3045        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3046        rusqlite::params![new_relation, new_strength, rel_id],
3047    )?;
3048
3049    Ok(EnrichItemResult::Done {
3050        memory_id: None,
3051        entity_id: None,
3052        entities: 0,
3053        rels: 1,
3054        chars_before: None,
3055        chars_after: None,
3056        cost,
3057        is_oauth,
3058    })
3059}
3060
3061/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3062fn call_entity_connect(
3063    conn: &Connection,
3064    namespace: &str,
3065    item_key: &str,
3066    binary: &Path,
3067    model: Option<&str>,
3068    timeout: u64,
3069    mode: &EnrichMode,
3070) -> Result<EnrichItemResult, AppError> {
3071    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3072    let (e1_id, e1_name, e2_id, e2_name) =
3073        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3074            Some(p) => p,
3075            None => {
3076                return Ok(EnrichItemResult::Skipped {
3077                    reason: "pair no longer isolated".into(),
3078                })
3079            }
3080        };
3081    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3082    let (value, cost, is_oauth) = match mode {
3083        EnrichMode::ClaudeCode => call_claude(
3084            binary,
3085            ENTITY_CONNECT_PROMPT,
3086            ENTITY_CONNECT_SCHEMA,
3087            &input_text,
3088            model,
3089            timeout,
3090        )?,
3091        EnrichMode::Codex => call_codex(
3092            binary,
3093            ENTITY_CONNECT_PROMPT,
3094            ENTITY_CONNECT_SCHEMA,
3095            &input_text,
3096            model,
3097            timeout,
3098        )?,
3099    };
3100    let relation = value
3101        .get("relation")
3102        .and_then(|v| v.as_str())
3103        .unwrap_or("none");
3104    if relation == "none" {
3105        return Ok(EnrichItemResult::Skipped {
3106            reason: "LLM determined no relationship".into(),
3107        });
3108    }
3109    let strength = value
3110        .get("strength")
3111        .and_then(|v| v.as_f64())
3112        .unwrap_or(0.5);
3113    conn.execute(
3114        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3115        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3116    )?;
3117    Ok(EnrichItemResult::Done {
3118        memory_id: None,
3119        entity_id: None,
3120        entities: 0,
3121        rels: 1,
3122        chars_before: None,
3123        chars_after: None,
3124        cost,
3125        is_oauth,
3126    })
3127}
3128
3129/// G27 P2: Validate entity type assignment via LLM.
3130fn call_entity_type_validate(
3131    conn: &Connection,
3132    _namespace: &str,
3133    item_key: &str,
3134    binary: &Path,
3135    model: Option<&str>,
3136    timeout: u64,
3137    mode: &EnrichMode,
3138) -> Result<EnrichItemResult, AppError> {
3139    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3140        .query_row(
3141            "SELECT id, name, type FROM entities WHERE name = ?1",
3142            rusqlite::params![item_key],
3143            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3144        )
3145        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3146    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3147    let (value, cost, is_oauth) = match mode {
3148        EnrichMode::ClaudeCode => call_claude(
3149            binary,
3150            ENTITY_TYPE_VALIDATE_PROMPT,
3151            ENTITY_TYPE_VALIDATE_SCHEMA,
3152            &input_text,
3153            model,
3154            timeout,
3155        )?,
3156        EnrichMode::Codex => call_codex(
3157            binary,
3158            ENTITY_TYPE_VALIDATE_PROMPT,
3159            ENTITY_TYPE_VALIDATE_SCHEMA,
3160            &input_text,
3161            model,
3162            timeout,
3163        )?,
3164    };
3165    let validated_type = value
3166        .get("validated_type")
3167        .and_then(|v| v.as_str())
3168        .unwrap_or(&ent_type);
3169    let was_correct = value
3170        .get("was_correct")
3171        .and_then(|v| v.as_bool())
3172        .unwrap_or(true);
3173    if !was_correct {
3174        conn.execute(
3175            "UPDATE entities SET type = ?1 WHERE id = ?2",
3176            rusqlite::params![validated_type, ent_id],
3177        )?;
3178    }
3179    Ok(EnrichItemResult::Done {
3180        memory_id: None,
3181        entity_id: Some(ent_id),
3182        entities: 1,
3183        rels: 0,
3184        chars_before: None,
3185        chars_after: None,
3186        cost,
3187        is_oauth,
3188    })
3189}
3190
3191/// G27 P2: Enrich generic memory description via LLM.
3192fn call_description_enrich(
3193    conn: &Connection,
3194    _namespace: &str,
3195    item_key: &str,
3196    binary: &Path,
3197    model: Option<&str>,
3198    timeout: u64,
3199    mode: &EnrichMode,
3200) -> Result<EnrichItemResult, AppError> {
3201    let (mem_id, body, old_desc): (i64, String, String) = conn
3202        .query_row(
3203            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3204            rusqlite::params![item_key],
3205            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3206        )
3207        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3208    let snippet: String = body.chars().take(500).collect();
3209    let input_text = format!(
3210        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3211    );
3212    let (value, cost, is_oauth) = match mode {
3213        EnrichMode::ClaudeCode => call_claude(
3214            binary,
3215            DESCRIPTION_ENRICH_PROMPT,
3216            DESCRIPTION_ENRICH_SCHEMA,
3217            &input_text,
3218            model,
3219            timeout,
3220        )?,
3221        EnrichMode::Codex => call_codex(
3222            binary,
3223            DESCRIPTION_ENRICH_PROMPT,
3224            DESCRIPTION_ENRICH_SCHEMA,
3225            &input_text,
3226            model,
3227            timeout,
3228        )?,
3229    };
3230    let new_desc = value
3231        .get("description")
3232        .and_then(|v| v.as_str())
3233        .unwrap_or(&old_desc);
3234    conn.execute(
3235        "UPDATE memories SET description = ?1 WHERE id = ?2",
3236        rusqlite::params![new_desc, mem_id],
3237    )?;
3238    Ok(EnrichItemResult::Done {
3239        memory_id: Some(mem_id),
3240        entity_id: None,
3241        entities: 0,
3242        rels: 0,
3243        chars_before: Some(old_desc.len()),
3244        chars_after: Some(new_desc.len()),
3245        cost,
3246        is_oauth,
3247    })
3248}
3249
3250/// G27 P2: Classify memory into domain category via LLM.
3251fn call_domain_classify(
3252    conn: &Connection,
3253    _namespace: &str,
3254    item_key: &str,
3255    binary: &Path,
3256    model: Option<&str>,
3257    timeout: u64,
3258    mode: &EnrichMode,
3259) -> Result<EnrichItemResult, AppError> {
3260    let (mem_id, body, desc): (i64, String, String) = conn
3261        .query_row(
3262            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3263            rusqlite::params![item_key],
3264            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3265        )
3266        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3267    let snippet: String = body.chars().take(500).collect();
3268    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3269    let (value, cost, is_oauth) = match mode {
3270        EnrichMode::ClaudeCode => call_claude(
3271            binary,
3272            DOMAIN_CLASSIFY_PROMPT,
3273            DOMAIN_CLASSIFY_SCHEMA,
3274            &input_text,
3275            model,
3276            timeout,
3277        )?,
3278        EnrichMode::Codex => call_codex(
3279            binary,
3280            DOMAIN_CLASSIFY_PROMPT,
3281            DOMAIN_CLASSIFY_SCHEMA,
3282            &input_text,
3283            model,
3284            timeout,
3285        )?,
3286    };
3287    let domain = value
3288        .get("domain")
3289        .and_then(|v| v.as_str())
3290        .unwrap_or("uncategorized");
3291    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3292    conn.execute(
3293        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3294        rusqlite::params![metadata, mem_id],
3295    )?;
3296    Ok(EnrichItemResult::Done {
3297        memory_id: Some(mem_id),
3298        entity_id: None,
3299        entities: 0,
3300        rels: 0,
3301        chars_before: None,
3302        chars_after: None,
3303        cost,
3304        is_oauth,
3305    })
3306}
3307
3308/// G27 P2: Audit memory graph quality via LLM.
3309fn call_graph_audit(
3310    conn: &Connection,
3311    _namespace: &str,
3312    item_key: &str,
3313    binary: &Path,
3314    model: Option<&str>,
3315    timeout: u64,
3316    mode: &EnrichMode,
3317) -> Result<EnrichItemResult, AppError> {
3318    let (mem_id, body, desc): (i64, String, String) = conn
3319        .query_row(
3320            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3321            rusqlite::params![item_key],
3322            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3323        )
3324        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3325    let snippet: String = body.chars().take(500).collect();
3326    let ent_count: i64 = conn
3327        .query_row(
3328            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3329            rusqlite::params![mem_id],
3330            |r| r.get(0),
3331        )
3332        .unwrap_or(0);
3333    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3334    let (value, cost, is_oauth) = match mode {
3335        EnrichMode::ClaudeCode => call_claude(
3336            binary,
3337            GRAPH_AUDIT_PROMPT,
3338            GRAPH_AUDIT_SCHEMA,
3339            &input_text,
3340            model,
3341            timeout,
3342        )?,
3343        EnrichMode::Codex => call_codex(
3344            binary,
3345            GRAPH_AUDIT_PROMPT,
3346            GRAPH_AUDIT_SCHEMA,
3347            &input_text,
3348            model,
3349            timeout,
3350        )?,
3351    };
3352    let issues = value
3353        .get("issues")
3354        .and_then(|v| v.as_array())
3355        .map(|a| a.len())
3356        .unwrap_or(0);
3357    Ok(EnrichItemResult::Done {
3358        memory_id: Some(mem_id),
3359        entity_id: None,
3360        entities: 0,
3361        rels: issues,
3362        chars_before: None,
3363        chars_after: None,
3364        cost,
3365        is_oauth,
3366    })
3367}
3368
3369/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3370fn call_deep_research_synth(
3371    conn: &Connection,
3372    namespace: &str,
3373    item_key: &str,
3374    binary: &Path,
3375    model: Option<&str>,
3376    timeout: u64,
3377    mode: &EnrichMode,
3378) -> Result<EnrichItemResult, AppError> {
3379    let (mem_id, body): (i64, String) = conn
3380        .query_row(
3381            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3382            rusqlite::params![item_key],
3383            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3384        )
3385        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3386    let snippet: String = body.chars().take(2000).collect();
3387    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3388    let (value, cost, is_oauth) = match mode {
3389        EnrichMode::ClaudeCode => call_claude(
3390            binary,
3391            DEEP_RESEARCH_SYNTH_PROMPT,
3392            DEEP_RESEARCH_SYNTH_SCHEMA,
3393            &input_text,
3394            model,
3395            timeout,
3396        )?,
3397        EnrichMode::Codex => call_codex(
3398            binary,
3399            DEEP_RESEARCH_SYNTH_PROMPT,
3400            DEEP_RESEARCH_SYNTH_SCHEMA,
3401            &input_text,
3402            model,
3403            timeout,
3404        )?,
3405    };
3406    let mut ent_count = 0usize;
3407    let mut rel_count = 0usize;
3408    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3409        for e in ents {
3410            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3411            let etype_str = e
3412                .get("entity_type")
3413                .and_then(|v| v.as_str())
3414                .unwrap_or("concept");
3415            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3416            if name.len() >= 2 {
3417                let ne = NewEntity {
3418                    name: name.to_string(),
3419                    entity_type: etype,
3420                    description: None,
3421                };
3422                let _ = entities::upsert_entity(conn, namespace, &ne);
3423                ent_count += 1;
3424            }
3425        }
3426    }
3427    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3428        for r in rels {
3429            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3430            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3431            if src.is_empty() || tgt.is_empty() {
3432                continue;
3433            }
3434            let rel = r
3435                .get("relation")
3436                .and_then(|v| v.as_str())
3437                .unwrap_or("related");
3438            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3439            if let (Some(sid), Some(tid)) = (
3440                entities::find_entity_id(conn, namespace, src)?,
3441                entities::find_entity_id(conn, namespace, tgt)?,
3442            ) {
3443                let _ = entities::create_or_fetch_relationship(
3444                    conn, namespace, sid, tid, rel, str_, None,
3445                );
3446                rel_count += 1;
3447            }
3448        }
3449    }
3450    Ok(EnrichItemResult::Done {
3451        memory_id: Some(mem_id),
3452        entity_id: None,
3453        entities: ent_count,
3454        rels: rel_count,
3455        chars_before: None,
3456        chars_after: None,
3457        cost,
3458        is_oauth,
3459    })
3460}
3461
3462/// G27 P2: Extract structured body from unstructured text via LLM.
3463fn call_body_extract(
3464    conn: &Connection,
3465    _namespace: &str,
3466    item_key: &str,
3467    binary: &Path,
3468    model: Option<&str>,
3469    timeout: u64,
3470    mode: &EnrichMode,
3471) -> Result<EnrichItemResult, AppError> {
3472    let (mem_id, body): (i64, String) = conn
3473        .query_row(
3474            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3475            rusqlite::params![item_key],
3476            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3477        )
3478        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3479    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3480    let (value, cost, is_oauth) = match mode {
3481        EnrichMode::ClaudeCode => call_claude(
3482            binary,
3483            BODY_EXTRACT_PROMPT,
3484            BODY_EXTRACT_SCHEMA,
3485            &input_text,
3486            model,
3487            timeout,
3488        )?,
3489        EnrichMode::Codex => call_codex(
3490            binary,
3491            BODY_EXTRACT_PROMPT,
3492            BODY_EXTRACT_SCHEMA,
3493            &input_text,
3494            model,
3495            timeout,
3496        )?,
3497    };
3498    let restructured = value
3499        .get("restructured_body")
3500        .and_then(|v| v.as_str())
3501        .unwrap_or(&body);
3502    let chars_before = body.len();
3503    let chars_after = restructured.len();
3504    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3505    conn.execute(
3506        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3507        rusqlite::params![restructured, new_hash, mem_id],
3508    )?;
3509    Ok(EnrichItemResult::Done {
3510        memory_id: Some(mem_id),
3511        entity_id: None,
3512        entities: 0,
3513        rels: 0,
3514        chars_before: Some(chars_before),
3515        chars_after: Some(chars_after),
3516        cost,
3517        is_oauth,
3518    })
3519}
3520
3521/// Scan for pairs of entities that share no direct relationship.
3522#[allow(clippy::type_complexity)]
3523fn scan_isolated_entity_pairs(
3524    conn: &Connection,
3525    namespace: &str,
3526    limit: Option<usize>,
3527) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3528    let limit_val = limit.unwrap_or(50) as i64;
3529    let mut stmt = conn.prepare_cached(
3530        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3531         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3532         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3533           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3534           (r.source_id = e2.id AND r.target_id = e1.id)) \
3535         LIMIT ?2",
3536    )?;
3537    let rows = stmt
3538        .query_map(rusqlite::params![namespace, limit_val], |r| {
3539            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3540        })?
3541        .collect::<Result<Vec<_>, _>>()?;
3542    Ok(rows)
3543}
3544
3545/// Scan for entities with non-validated types (all entities for type audit).
3546fn scan_entities_for_type_validation(
3547    conn: &Connection,
3548    namespace: &str,
3549    limit: Option<usize>,
3550) -> Result<Vec<(i64, String, String)>, AppError> {
3551    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3552    let sql = format!(
3553        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3554    );
3555    let mut stmt = conn.prepare(&sql)?;
3556    let rows = stmt
3557        .query_map(rusqlite::params![namespace], |r| {
3558            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3559        })?
3560        .collect::<Result<Vec<_>, _>>()?;
3561    Ok(rows)
3562}
3563
3564/// Scan for memories with generic descriptions (ingested, imported, etc).
3565fn scan_generic_descriptions(
3566    conn: &Connection,
3567    namespace: &str,
3568    limit: Option<usize>,
3569) -> Result<Vec<(i64, String, String)>, AppError> {
3570    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3571    let sql = format!(
3572        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3573         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3574         ORDER BY id {limit_clause}"
3575    );
3576    let mut stmt = conn.prepare(&sql)?;
3577    let rows = stmt
3578        .query_map(rusqlite::params![namespace], |r| {
3579            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3580        })?
3581        .collect::<Result<Vec<_>, _>>()?;
3582    Ok(rows)
3583}
3584
3585/// Calls the Codex CLI for a single enrichment item.
3586///
3587/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3588fn call_codex(
3589    binary: &Path,
3590    prompt: &str,
3591    json_schema: &str,
3592    input_text: &str,
3593    model: Option<&str>,
3594    timeout_secs: u64,
3595) -> Result<(serde_json::Value, f64, bool), AppError> {
3596    use wait_timeout::ChildExt;
3597
3598    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3599    // schema to a trusted cache path (not /tmp), and reuse the
3600    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3601    // hardening rationale.
3602    super::codex_spawn::validate_codex_model(model)?;
3603    let schema_file = super::codex_spawn::trusted_schema_path()?;
3604
3605    let args = super::codex_spawn::CodexSpawnArgs {
3606        binary,
3607        prompt,
3608        json_schema,
3609        input_text,
3610        model,
3611        timeout_secs,
3612        schema_path: schema_file.clone(),
3613    };
3614    let mut cmd = super::codex_spawn::build_codex_command(&args);
3615
3616    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3617        AppError::Io(std::io::Error::new(
3618            e.kind(),
3619            format!("failed to spawn codex: {e}"),
3620        ))
3621    })?;
3622
3623    let full_prompt = format!("{prompt}\n\n{input_text}");
3624    let stdin_bytes = full_prompt.into_bytes();
3625    let mut child_stdin = child
3626        .stdin
3627        .take()
3628        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3629    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3630        child_stdin.write_all(&stdin_bytes)?;
3631        drop(child_stdin);
3632        Ok(())
3633    });
3634
3635    let start = std::time::Instant::now();
3636    let timeout = std::time::Duration::from_secs(timeout_secs);
3637    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3638    let _ = std::fs::remove_file(&schema_file);
3639
3640    match status {
3641        Some(exit_status) => {
3642            stdin_thread
3643                .join()
3644                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3645                .map_err(AppError::Io)?;
3646
3647            tracing::debug!(
3648                target: "process",
3649                exit_code = ?exit_status.code(),
3650                elapsed_ms = start.elapsed().as_millis() as u64,
3651                "external process completed"
3652            );
3653
3654            let mut stdout_buf = Vec::new();
3655            if let Some(mut out) = child.stdout.take() {
3656                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3657            }
3658            if !exit_status.success() {
3659                let mut stderr_buf = Vec::new();
3660                if let Some(mut err) = child.stderr.take() {
3661                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3662                }
3663                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3664                tracing::warn!(
3665                    target: "enrich",
3666                    exit_code = ?exit_status.code(),
3667                    stderr = %stderr_str.trim(),
3668                    "codex process failed"
3669                );
3670                return Err(AppError::Validation(format!(
3671                    "codex exited with code {:?}: {}",
3672                    exit_status.code(),
3673                    stderr_str.trim()
3674                )));
3675            }
3676            let stdout_str = String::from_utf8(stdout_buf)
3677                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3678            // G32: use the JSONL parser, NOT serde_json::from_str on the
3679            // entire stdout (codex emits one event per line).
3680            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3681            // Return the raw agent_message text parsed as JSON. Different
3682            // operations (memory-bindings, body-enrich) use different
3683            // output schemas, so we let the caller pick which fields to
3684            // extract. The previous implementation hardcoded
3685            // `{entities, urls}` which broke body-enrich.
3686            let value: serde_json::Value =
3687                serde_json::from_str(&result.last_agent_text).map_err(|e| {
3688                    AppError::Validation(format!(
3689                        "codex agent_message is not valid JSON: {e}; raw={}",
3690                        result.last_agent_text
3691                    ))
3692                })?;
3693            Ok((value, 0.0, false))
3694        }
3695        None => {
3696            let _ = child.kill();
3697            let _ = child.wait();
3698            let _ = stdin_thread.join();
3699            Err(AppError::Validation(format!(
3700                "codex timed out after {timeout_secs} seconds"
3701            )))
3702        }
3703    }
3704}
3705
3706// ---------------------------------------------------------------------------
3707// Tests
3708// ---------------------------------------------------------------------------
3709
3710#[cfg(test)]
3711mod tests {
3712    use super::*;
3713    use rusqlite::Connection;
3714    #[cfg(unix)]
3715    use std::os::unix::fs::PermissionsExt;
3716
3717    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
3718    fn open_test_db() -> Connection {
3719        let conn = Connection::open_in_memory().expect("in-memory db");
3720        conn.execute_batch(
3721            "CREATE TABLE memories (
3722                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3723                namespace   TEXT NOT NULL DEFAULT 'global',
3724                name        TEXT NOT NULL,
3725                type        TEXT NOT NULL DEFAULT 'note',
3726                description TEXT NOT NULL DEFAULT '',
3727                body        TEXT NOT NULL DEFAULT '',
3728                body_hash   TEXT NOT NULL DEFAULT '',
3729                session_id  TEXT,
3730                source      TEXT NOT NULL DEFAULT 'agent',
3731                metadata    TEXT NOT NULL DEFAULT '{}',
3732                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3733                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3734                deleted_at  INTEGER,
3735                UNIQUE(namespace, name)
3736            );
3737            CREATE TABLE entities (
3738                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3739                namespace   TEXT NOT NULL DEFAULT 'global',
3740                name        TEXT NOT NULL,
3741                type        TEXT NOT NULL DEFAULT 'concept',
3742                description TEXT,
3743                degree      INTEGER NOT NULL DEFAULT 0,
3744                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3745                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3746                UNIQUE(namespace, name)
3747            );
3748            CREATE TABLE memory_entities (
3749                memory_id  INTEGER NOT NULL,
3750                entity_id  INTEGER NOT NULL,
3751                PRIMARY KEY (memory_id, entity_id)
3752            );
3753            CREATE TABLE relationships (
3754                id         INTEGER PRIMARY KEY AUTOINCREMENT,
3755                namespace  TEXT NOT NULL DEFAULT 'global',
3756                source_id  INTEGER NOT NULL,
3757                target_id  INTEGER NOT NULL,
3758                relation   TEXT NOT NULL,
3759                weight     REAL NOT NULL DEFAULT 0.5,
3760                description TEXT,
3761                UNIQUE(source_id, target_id, relation)
3762            );
3763            CREATE TABLE memory_embeddings (
3764                memory_id   INTEGER PRIMARY KEY,
3765                namespace   TEXT NOT NULL,
3766                embedding   BLOB NOT NULL,
3767                source      TEXT NOT NULL,
3768                model       TEXT NOT NULL DEFAULT '',
3769                dim         INTEGER NOT NULL DEFAULT 384,
3770                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
3771            );",
3772        )
3773        .expect("schema creation must succeed");
3774        conn
3775    }
3776
3777    #[test]
3778    fn scan_unbound_memories_finds_memories_without_bindings() {
3779        let conn = open_test_db();
3780        conn.execute(
3781            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3782            [],
3783        )
3784        .unwrap();
3785
3786        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3787        assert_eq!(results.len(), 1);
3788        assert_eq!(results[0].1, "test-mem");
3789    }
3790
3791    #[test]
3792    fn scan_unbound_memories_excludes_bound_memories() {
3793        let conn = open_test_db();
3794        conn.execute(
3795            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3796            [],
3797        )
3798        .unwrap();
3799        let mem_id: i64 = conn
3800            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3801                r.get(0)
3802            })
3803            .unwrap();
3804        conn.execute(
3805            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3806            [],
3807        )
3808        .unwrap();
3809        let ent_id: i64 = conn
3810            .query_row(
3811                "SELECT id FROM entities WHERE name='some-entity'",
3812                [],
3813                |r| r.get(0),
3814            )
3815            .unwrap();
3816        conn.execute(
3817            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3818            rusqlite::params![mem_id, ent_id],
3819        )
3820        .unwrap();
3821
3822        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3823        assert!(results.is_empty(), "bound memory must not appear in scan");
3824    }
3825
3826    #[test]
3827    fn scan_entities_without_description_finds_null_description() {
3828        let conn = open_test_db();
3829        conn.execute(
3830            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3831            [],
3832        )
3833        .unwrap();
3834
3835        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3836        assert_eq!(results.len(), 1);
3837        assert_eq!(results[0].1, "my-tool");
3838    }
3839
3840    #[test]
3841    fn scan_entities_without_description_excludes_entities_with_description() {
3842        let conn = open_test_db();
3843        conn.execute(
3844            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3845            [],
3846        )
3847        .unwrap();
3848
3849        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3850        assert!(
3851            results.is_empty(),
3852            "entity with description must not appear"
3853        );
3854    }
3855
3856    #[test]
3857    fn scan_short_body_memories_finds_short_bodies() {
3858        let conn = open_test_db();
3859        conn.execute(
3860            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3861            [],
3862        )
3863        .unwrap();
3864
3865        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3866        assert_eq!(results.len(), 1);
3867        assert_eq!(results[0].1, "short-mem");
3868    }
3869
3870    #[test]
3871    fn scan_short_body_memories_excludes_long_bodies() {
3872        let conn = open_test_db();
3873        let long_body = "a".repeat(1000);
3874        conn.execute(
3875            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3876            rusqlite::params![long_body],
3877        )
3878        .unwrap();
3879
3880        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3881        assert!(results.is_empty(), "long memory must not appear in scan");
3882    }
3883
3884    #[test]
3885    fn scan_respects_limit() {
3886        let conn = open_test_db();
3887        for i in 0..5 {
3888            conn.execute(
3889                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3890                [],
3891            )
3892            .unwrap();
3893        }
3894
3895        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3896        assert_eq!(results.len(), 3, "limit must be respected");
3897    }
3898
3899    #[test]
3900    fn scan_memories_without_embeddings_finds_only_missing_rows() {
3901        let conn = open_test_db();
3902        conn.execute(
3903            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3904            [],
3905        )
3906        .unwrap();
3907        conn.execute(
3908            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3909            [],
3910        )
3911        .unwrap();
3912        let memory_id: i64 = conn
3913            .query_row(
3914                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3915                [],
3916                |r| r.get(0),
3917            )
3918            .unwrap();
3919        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3920        memories::upsert_vec(
3921            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3922        )
3923        .unwrap();
3924
3925        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3926        assert_eq!(results.len(), 1);
3927        assert_eq!(results[0].1, "missing-vec");
3928    }
3929
3930    #[test]
3931    fn scan_memories_without_embeddings_respects_name_filter() {
3932        let conn = open_test_db();
3933        conn.execute(
3934            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3935            [],
3936        )
3937        .unwrap();
3938        conn.execute(
3939            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3940            [],
3941        )
3942        .unwrap();
3943
3944        let results =
3945            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3946                .unwrap();
3947        assert_eq!(results.len(), 1);
3948        assert_eq!(results[0].1, "match-me");
3949    }
3950
3951    #[test]
3952    fn queue_db_schema_creates_correctly() {
3953        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3954        let conn = open_queue_db(&tmp_path).expect("queue db must open");
3955        let count: i64 = conn
3956            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3957            .unwrap();
3958        assert_eq!(count, 0);
3959        let _ = std::fs::remove_file(&tmp_path);
3960    }
3961
3962    #[test]
3963    fn parse_claude_output_valid_bindings() {
3964        let output = r#"[
3965            {"type":"system","subtype":"init"},
3966            {"type":"result","is_error":false,"total_cost_usd":0.01,
3967             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3968        ]"#;
3969        let result = crate::commands::claude_runner::parse_claude_output(output)
3970            .expect("must parse successfully");
3971        assert!(result.value.get("entities").is_some());
3972        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3973        assert!(!result.is_oauth);
3974    }
3975
3976    #[test]
3977    fn parse_claude_output_detects_oauth() {
3978        let output = r#"[
3979            {"type":"system","subtype":"init","apiKeySource":"none"},
3980            {"type":"result","is_error":false,"total_cost_usd":0.0,
3981             "structured_output":{"entities":[],"relationships":[]}}
3982        ]"#;
3983        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3984        assert!(result.is_oauth);
3985    }
3986
3987    #[test]
3988    fn parse_claude_output_rate_limit_returns_error() {
3989        let output = r#"[
3990            {"type":"system","subtype":"init"},
3991            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3992        ]"#;
3993        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3994        assert!(matches!(err, AppError::RateLimited { .. }));
3995    }
3996
3997    #[test]
3998    fn parse_claude_output_auth_error() {
3999        let output = r#"[
4000            {"type":"system","subtype":"init"},
4001            {"type":"result","is_error":true,"error":"authentication failed"}
4002        ]"#;
4003        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4004        assert!(format!("{err}").contains("authentication failed"));
4005    }
4006
4007    #[cfg(unix)]
4008    #[test]
4009    fn call_codex_returns_raw_json_for_body_enrich_schema() {
4010        let tmp = tempfile::tempdir().expect("tempdir");
4011        let binary = tmp.path().join("codex-mock");
4012        std::fs::write(
4013            &binary,
4014            r#"#!/usr/bin/env bash
4015set -euo pipefail
4016cat <<'JSONL'
4017{"type":"thread.started","thread_id":"mock-thread-0"}
4018{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4019{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4020JSONL
4021"#,
4022        )
4023        .expect("mock codex write");
4024        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4025        perms.set_mode(0o755);
4026        std::fs::set_permissions(&binary, perms).expect("chmod");
4027
4028        let (value, cost, is_oauth) =
4029            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4030                .expect("call_codex must accept body-enrich payload");
4031
4032        assert_eq!(value["enriched_body"], "expanded body");
4033        assert_eq!(cost, 0.0);
4034        assert!(!is_oauth);
4035    }
4036
4037    #[test]
4038    fn dry_run_emits_preview_without_calling_llm() {
4039        // This test validates the dry-run NDJSON contract without spawning any process.
4040        // The scan_operation function requires a DB; we build one in-memory but cannot
4041        // call run() directly because it needs AppPaths (disk). Instead we test the
4042        // lower-level helpers that the dry-run path relies on.
4043        let conn = open_test_db();
4044        conn.execute(
4045            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4046            [],
4047        )
4048        .unwrap();
4049
4050        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4051        assert_eq!(results.len(), 1);
4052        assert_eq!(results[0].1, "dry-mem");
4053        // If scan finds the item and dry_run is set, no LLM would be called.
4054        // The NDJSON emission is tested via integration tests with a fake binary.
4055    }
4056
4057    #[test]
4058    fn persist_entity_description_updates_db() {
4059        let conn = open_test_db();
4060        conn.execute(
4061            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4062            [],
4063        )
4064        .unwrap();
4065        let eid: i64 = conn
4066            .query_row(
4067                "SELECT id FROM entities WHERE name='tokio-runtime'",
4068                [],
4069                |r| r.get(0),
4070            )
4071            .unwrap();
4072
4073        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4074
4075        let desc: String = conn
4076            .query_row(
4077                "SELECT description FROM entities WHERE id=?1",
4078                rusqlite::params![eid],
4079                |r| r.get(0),
4080            )
4081            .unwrap();
4082        assert_eq!(desc, "Async runtime for Rust applications");
4083    }
4084
4085    #[test]
4086    fn bindings_schema_is_valid_json() {
4087        let _: serde_json::Value =
4088            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4089    }
4090
4091    #[test]
4092    fn entity_description_schema_is_valid_json() {
4093        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4094            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4095    }
4096
4097    #[test]
4098    fn body_enrich_schema_is_valid_json() {
4099        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4100            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4101    }
4102}