Skip to main content

sqlite_graphrag/commands/
enrich.rs

1// TODO v1.0.89: este arquivo tem 4116 linhas — modularização planejada.
2// Ver ADR-0046 seção "Known Tech Debt (v1.0.89+)".
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY opportunity
19//!
20//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
21//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
22//! here verbatim. A future refactoring could extract them into a shared
23//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
24//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
25//! this stream's boundary — flagged here for the Integration stream to evaluate.
26
27use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42// ---------------------------------------------------------------------------
43// Constants
44// ---------------------------------------------------------------------------
45
46const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51// ---------------------------------------------------------------------------
52// JSON schema used for memory-bindings and body-enrich extraction
53// ---------------------------------------------------------------------------
54
55const BINDINGS_SCHEMA: &str = r#"{
56  "type": "object",
57  "properties": {
58    "entities": {
59      "type": "array",
60      "items": {
61        "type": "object",
62        "properties": {
63          "name": { "type": "string" },
64          "entity_type": {
65            "type": "string",
66            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67          }
68        },
69        "required": ["name", "entity_type"],
70        "additionalProperties": false
71      }
72    },
73    "relationships": {
74      "type": "array",
75      "items": {
76        "type": "object",
77        "properties": {
78          "source": { "type": "string" },
79          "target": { "type": "string" },
80          "relation": {
81            "type": "string",
82            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83          },
84          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85        },
86        "required": ["source","target","relation","strength"],
87        "additionalProperties": false
88      }
89    }
90  },
91  "required": ["entities","relationships"],
92  "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96  "type": "object",
97  "properties": {
98    "description": { "type": "string" }
99  },
100  "required": ["description"],
101  "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105  "type": "object",
106  "properties": {
107    "enriched_body": { "type": "string" }
108  },
109  "required": ["enriched_body"],
110  "additionalProperties": false
111}"#;
112
113// G27 P1: weight-calibrate
114const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123  "type": "object",
124  "properties": {
125    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126    "reasoning": { "type": "string" }
127  },
128  "required": ["calibrated_weight", "reasoning"],
129  "additionalProperties": false
130}"#;
131
132// G27 P1: relation-reclassify
133const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149  "type": "object",
150  "properties": {
151    "relation": { "type": "string" },
152    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153    "reasoning": { "type": "string" }
154  },
155  "required": ["relation", "strength", "reasoning"],
156  "additionalProperties": false
157}"#;
158
159// G27 P2: entity-connect — suggest relationships between isolated entities
160const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166  "type": "object",
167  "properties": {
168    "relation": { "type": "string" },
169    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170    "reasoning": { "type": "string" }
171  },
172  "required": ["relation", "strength", "reasoning"],
173  "additionalProperties": false
174}"#;
175
176// G27 P2: entity-type-validate — verify entity type assignments
177const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183  "type": "object",
184  "properties": {
185    "validated_type": { "type": "string" },
186    "was_correct": { "type": "boolean" },
187    "reasoning": { "type": "string" }
188  },
189  "required": ["validated_type", "was_correct", "reasoning"],
190  "additionalProperties": false
191}"#;
192
193// G27 P2: description-enrich — improve generic memory descriptions
194const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200  "type": "object",
201  "properties": {
202    "description": { "type": "string" },
203    "reasoning": { "type": "string" }
204  },
205  "required": ["description", "reasoning"],
206  "additionalProperties": false
207}"#;
208
209// G27 P2: domain-classify — classify memory into domain category
210const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214  "type": "object",
215  "properties": {
216    "domain": { "type": "string" },
217    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218    "reasoning": { "type": "string" }
219  },
220  "required": ["domain", "confidence", "reasoning"],
221  "additionalProperties": false
222}"#;
223
224// G27 P2: graph-audit — audit graph for quality issues
225const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230  "type": "object",
231  "properties": {
232    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234    "reasoning": { "type": "string" }
235  },
236  "required": ["quality_score", "issues", "reasoning"],
237  "additionalProperties": false
238}"#;
239
240// G27 P2: deep-research-synth — synthesize research findings into graph
241const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247  "type": "object",
248  "properties": {
249    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251    "summary": { "type": "string" }
252  },
253  "required": ["entities", "relationships", "summary"],
254  "additionalProperties": false
255}"#;
256
257// G27 P2: body-extract — extract structured content from unstructured text
258const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263  "type": "object",
264  "properties": {
265    "restructured_body": { "type": "string" },
266    "changes_summary": { "type": "string" }
267  },
268  "required": ["restructured_body", "changes_summary"],
269  "additionalProperties": false
270}"#;
271
272// ---------------------------------------------------------------------------
273// Prompts
274// ---------------------------------------------------------------------------
275
276const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291// ---------------------------------------------------------------------------
292// CLI args
293// ---------------------------------------------------------------------------
294
295/// Operation to perform in the `enrich` command.
296#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299    /// Add missing entity/relationship bindings to memories (fully implemented).
300    MemoryBindings,
301    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
302    EntityDescriptions,
303    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
304    BodyEnrich,
305    /// Rebuild missing memory embeddings without rewriting the memory body.
306    ReEmbed,
307    /// Calibrate relationship weights using LLM analysis (scan only).
308    WeightCalibrate,
309    /// Reclassify relationship types using LLM judgment (scan only).
310    RelationReclassify,
311    /// Connect isolated entities by suggesting new relationships (scan only).
312    EntityConnect,
313    /// Validate entity type assignments using LLM judgment (scan only).
314    EntityTypeValidate,
315    /// Enrich memory descriptions that are generic/auto-generated (scan only).
316    DescriptionEnrich,
317    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
318    CrossDomainBridges,
319    /// Classify memories into domain categories (scan only).
320    DomainClassify,
321    /// Audit the graph for quality issues (scan only).
322    GraphAudit,
323    /// Synthesize deep-research findings into graph memories (scan only).
324    DeepResearchSynth,
325    /// Extract structured body from unstructured text (scan only).
326    BodyExtract,
327}
328
329/// LLM provider for enrichment.
330#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332    /// Use locally installed Claude Code CLI (OAuth-first).
333    ClaudeCode,
334    /// Use locally installed OpenAI Codex CLI.
335    Codex,
336}
337
338impl std::fmt::Display for EnrichMode {
339    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340        match self {
341            EnrichMode::ClaudeCode => write!(f, "claude-code"),
342            EnrichMode::Codex => write!(f, "codex"),
343        }
344    }
345}
346
347/// Arguments for the `enrich` subcommand.
348#[derive(clap::Args)]
349#[command(
350    about = "Enrich graph memories and entities using an LLM provider",
351    after_long_help = "EXAMPLES:\n  \
352    # Add missing entity bindings to all unbound memories\n  \
353    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
354    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
355    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
356    # Expand short memory bodies (GAP-18)\n  \
357    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
358    # Rebuild only missing memory embeddings without rewriting bodies\n  \
359    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
360    # Resume an interrupted body-enrich run\n  \
361    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
362    # Retry only failed items from a previous run\n  \
363    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
364    EXIT CODES:\n  \
365    0  success\n  \
366    1  validation error (bad args, binary not found)\n  \
367    14 I/O error"
368)]
369pub struct EnrichArgs {
370    /// Enrichment operation to run.
371    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
372    pub operation: EnrichOperation,
373
374    /// LLM provider to use. Default: claude-code (OAuth-first).
375    #[arg(long, value_enum, default_value = "claude-code")]
376    pub mode: EnrichMode,
377
378    /// Maximum number of items to process in this run. Omit for all.
379    #[arg(long, value_name = "N")]
380    pub limit: Option<usize>,
381
382    /// Preview items without calling the LLM (zero tokens consumed).
383    #[arg(long)]
384    pub dry_run: bool,
385
386    /// Namespace to operate on. Default: global.
387    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
388    pub namespace: Option<String>,
389
390    // -- Provider flags (Claude) --
391    /// Path to the Claude Code binary. Default: auto-detect from PATH.
392    #[arg(long, value_name = "PATH")]
393    pub claude_binary: Option<PathBuf>,
394
395    /// Claude model to use (e.g. claude-sonnet-4-6).
396    #[arg(long, value_name = "MODEL")]
397    pub claude_model: Option<String>,
398
399    /// Timeout per item in seconds when using Claude Code. Default: 300.
400    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
401    pub claude_timeout: u64,
402
403    // -- Provider flags (Codex) --
404    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
405    #[arg(long, value_name = "PATH")]
406    pub codex_binary: Option<PathBuf>,
407
408    /// Codex model to use (e.g. o4-mini).
409    #[arg(long, value_name = "MODEL")]
410    pub codex_model: Option<String>,
411
412    /// Timeout per item in seconds when using Codex. Default: 300.
413    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
414    pub codex_timeout: u64,
415
416    // -- Cost controls --
417    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
418    #[arg(long, value_name = "USD")]
419    pub max_cost_usd: Option<f64>,
420
421    // -- Queue controls --
422    /// Resume a previously interrupted run (skip already-done items).
423    #[arg(long)]
424    pub resume: bool,
425
426    /// Retry only items that failed in a previous run.
427    #[arg(long)]
428    pub retry_failed: bool,
429
430    // -- body-enrich specific flags (GAP-18) --
431    /// Minimum output character count for body-enrich. Default: 500.
432    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
433    pub min_output_chars: usize,
434
435    /// Maximum output character count for body-enrich. Default: 2000.
436    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
437    pub max_output_chars: usize,
438
439    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
440    #[arg(long, default_value_t = true)]
441    pub preserve_check: bool,
442
443    /// Path to a custom prompt template file for body-enrich.
444    #[arg(long, value_name = "PATH")]
445    pub prompt_template: Option<PathBuf>,
446
447    /// Number of parallel LLM workers (default 1 = serial).
448    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
449    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
450    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
451    pub llm_parallelism: u32,
452
453    // -- Output / infra --
454    /// Emit NDJSON output. Always true; flag accepted for compatibility.
455    #[arg(long)]
456    pub json: bool,
457
458    /// Database path override.
459    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
460    pub db: Option<String>,
461
462    /// G30: poll for the job singleton every second for up to N seconds
463    /// when another invocation holds the lock. Default: 0 (fail fast).
464    #[arg(long, value_name = "SECONDS")]
465    pub wait_job_singleton: Option<u64>,
466
467    /// G30: force acquisition of the singleton lock by removing a stale
468    /// lock file from a previously crashed invocation. Use only when you
469    /// are certain no other `enrich`/`ingest` is running.
470    #[arg(long, default_value_t = false)]
471    pub force_job_singleton: bool,
472
473    /// G37: select a specific subset of memory names to enrich instead of
474    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
475    /// Empty when omitted (processes all candidates).
476    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
477    pub names: Vec<String>,
478
479    /// G37: read the subset of memory names from a file (one per line).
480    /// Lines starting with `#` and empty lines are ignored. Combined with
481    /// `--names` (union) when both are set.
482    #[arg(long, value_name = "PATH")]
483    pub names_file: Option<PathBuf>,
484
485    /// G35: probe the LLM provider with a 1-turn ping before processing
486    /// the batch. Aborts with a clear error if the rate-limit window is
487    /// closed (avoids burning N turns only to fail on item 1).
488    #[arg(long, default_value_t = false)]
489    pub preflight_check: bool,
490
491    /// G35: if a preflight probe or in-flight call hits the Claude rate
492    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
493    /// of failing the batch. Ignored when `--mode` is already `codex`.
494    #[arg(long, value_enum)]
495    pub fallback_mode: Option<EnrichMode>,
496
497    /// G35: number of seconds before the OAuth rate-limit reset at which
498    /// the preflight probe should refuse to start. Default 300 (5 min).
499    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
500    pub rate_limit_buffer: u64,
501
502    /// G28-D: refuse to start when the 1-minute load average exceeds
503    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
504    /// Set to false to skip the check on contended CI runners.
505    #[arg(long, default_value_t = true)]
506    pub max_load_check: bool,
507
508    /// G28-D: when the system is saturated, abort the job after this
509    /// many consecutive HardFailure outcomes. Default 5.
510    #[arg(long, value_name = "N", default_value_t = 5)]
511    pub circuit_breaker_threshold: u32,
512
513    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
514    /// original body and the LLM-rewritten body for the rewrite to be
515    /// accepted. Scores below the threshold are rejected and emitted as
516    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
517    /// gap specification). Ignored when `--operation` is not
518    /// `body-enrich`.
519    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
520    pub preserve_threshold: f64,
521
522    /// G33 Passo 3: when set, validate `--codex-model` against the
523    /// ChatGPT Pro OAuth accepted-model list and abort with a
524    /// suggestion when the value is unknown. Default true (fail fast
525    /// to avoid burning OAuth turns). Set to false to opt out.
526    #[arg(long, default_value_t = true)]
527    pub codex_model_validate: bool,
528
529    /// G33 Passo 3: when set together with an invalid `--codex-model`,
530    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
531    /// instead of aborting. The substitution is recorded in the NDJSON
532    /// stream as `provider_substituted: true` for traceability.
533    #[arg(long, value_name = "MODEL")]
534    pub codex_model_fallback: Option<String>,
535}
536
537// ---------------------------------------------------------------------------
538// Internal types — raw LLM output structs
539// ---------------------------------------------------------------------------
540
541// ---------------------------------------------------------------------------
542// NDJSON event types emitted to stdout
543// ---------------------------------------------------------------------------
544
545#[derive(Debug, Serialize)]
546struct PhaseEvent<'a> {
547    phase: &'a str,
548    #[serde(skip_serializing_if = "Option::is_none")]
549    binary_path: Option<&'a str>,
550    #[serde(skip_serializing_if = "Option::is_none")]
551    version: Option<&'a str>,
552    #[serde(skip_serializing_if = "Option::is_none")]
553    items_total: Option<usize>,
554    #[serde(skip_serializing_if = "Option::is_none")]
555    items_pending: Option<usize>,
556    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
557    #[serde(skip_serializing_if = "Option::is_none")]
558    llm_parallelism: Option<u32>,
559}
560
561#[derive(Debug, Serialize)]
562struct ItemEvent<'a> {
563    /// Item identifier (memory name or entity name).
564    item: &'a str,
565    status: &'a str,
566    #[serde(skip_serializing_if = "Option::is_none")]
567    memory_id: Option<i64>,
568    #[serde(skip_serializing_if = "Option::is_none")]
569    entity_id: Option<i64>,
570    #[serde(skip_serializing_if = "Option::is_none")]
571    entities: Option<usize>,
572    #[serde(skip_serializing_if = "Option::is_none")]
573    rels: Option<usize>,
574    #[serde(skip_serializing_if = "Option::is_none")]
575    chars_before: Option<usize>,
576    #[serde(skip_serializing_if = "Option::is_none")]
577    chars_after: Option<usize>,
578    #[serde(skip_serializing_if = "Option::is_none")]
579    cost_usd: Option<f64>,
580    #[serde(skip_serializing_if = "Option::is_none")]
581    elapsed_ms: Option<u64>,
582    #[serde(skip_serializing_if = "Option::is_none")]
583    error: Option<String>,
584    index: usize,
585    total: usize,
586}
587
588#[derive(Debug, Serialize)]
589struct EnrichSummary {
590    summary: bool,
591    operation: String,
592    items_total: usize,
593    completed: usize,
594    failed: usize,
595    skipped: usize,
596    cost_usd: f64,
597    elapsed_ms: u64,
598    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
599    /// executou o re-embedding durante o enrich. `"claude" | "codex" | "none"`.
600    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
601    /// ou quando a operação não envolveu re-embed).
602    #[serde(skip_serializing_if = "Option::is_none")]
603    backend_invoked: Option<&'static str>,
604}
605
606use crate::output::emit_json_line as emit_json;
607
608// ---------------------------------------------------------------------------
609// Queue DB
610// ---------------------------------------------------------------------------
611
612/// Opens or creates the enrichment queue database.
613///
614/// The queue schema mirrors `ingest_claude` for resume/retry parity.
615/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
616///
617/// # DRY note
618///
619/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
620/// Both should be unified in a shared `llm_runner.rs` module by the
621/// Integration stream.
622fn open_queue_db(path: &str) -> Result<Connection, AppError> {
623    let conn = Connection::open(path)?;
624    conn.pragma_update(None, "journal_mode", "wal")?;
625    conn.execute_batch(
626        "CREATE TABLE IF NOT EXISTS queue (
627            id          INTEGER PRIMARY KEY AUTOINCREMENT,
628            item_key    TEXT NOT NULL UNIQUE,
629            item_type   TEXT NOT NULL DEFAULT 'memory',
630            status      TEXT NOT NULL DEFAULT 'pending',
631            memory_id   INTEGER,
632            entity_id   INTEGER,
633            entities    INTEGER DEFAULT 0,
634            rels        INTEGER DEFAULT 0,
635            error       TEXT,
636            cost_usd    REAL DEFAULT 0.0,
637            attempt     INTEGER DEFAULT 0,
638            elapsed_ms  INTEGER,
639            created_at  TEXT DEFAULT (datetime('now')),
640            done_at     TEXT
641        );
642        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
643    )?;
644    Ok(conn)
645}
646
647// ---------------------------------------------------------------------------
648// LLM invocation — Claude Code
649// ---------------------------------------------------------------------------
650
651/// Calls `claude -p` via the shared `claude_runner` module (G02).
652///
653/// Returns `(output_value, cost_usd, is_oauth)`.
654fn call_claude(
655    binary: &Path,
656    prompt: &str,
657    json_schema: &str,
658    input_text: &str,
659    model: Option<&str>,
660    timeout_secs: u64,
661) -> Result<(serde_json::Value, f64, bool), AppError> {
662    let result = crate::commands::claude_runner::run_claude(
663        binary,
664        prompt,
665        json_schema,
666        input_text,
667        model,
668        timeout_secs,
669        7,
670    )?;
671    Ok((result.value, result.cost_usd, result.is_oauth))
672}
673
674// ---------------------------------------------------------------------------
675// Preflight probe (G35) — single-turn ping to verify the LLM provider
676// ---------------------------------------------------------------------------
677
678/// Result of a single preflight ping (G35).
679enum PreflightOutcome {
680    /// The provider accepted the ping without rate-limit or other errors.
681    Healthy,
682    /// The provider rejected the ping due to OAuth rate limit. The
683    /// `suggestion` field is a human hint that callers can embed in the
684    /// user-facing error.
685    RateLimited {
686        reason: String,
687        suggestion: &'static str,
688    },
689    /// Any other provider error (binary missing, auth failure, etc.).
690    Error(AppError),
691}
692
693/// Probes the configured LLM provider with a 1-turn ping.
694///
695/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
696/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
697///
698/// The probe intentionally avoids spawning any MCP server children (G28-A)
699/// to keep its own process footprint at the minimum.
700fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
701    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
702
703    match args.mode {
704        EnrichMode::ClaudeCode => {
705            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
706                Ok(b) => b,
707                Err(e) => return PreflightOutcome::Error(e),
708            };
709            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
710            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
711            // form) and run the preflight gate before spawn, mirroring
712            // the canonical pattern in `claude_runner::build_claude_command`.
713            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
714                Ok(p) => p,
715                Err(e) => {
716                    return PreflightOutcome::Error(AppError::Io(e));
717                }
718            };
719            let mut cmd = std::process::Command::new(&bin);
720            cmd.env_clear();
721            for var in &["PATH", "HOME", "USER"] {
722                if let Ok(val) = std::env::var(var) {
723                    cmd.env(var, val);
724                }
725            }
726            cmd.arg("-p")
727                .arg("ping")
728                .arg("--max-turns")
729                .arg("1")
730                .arg("--strict-mcp-config")
731                .arg("--mcp-config")
732                .arg(mcp_config_path.as_os_str())
733                .arg("--dangerously-skip-permissions")
734                .arg("--settings")
735                .arg("{\"hooks\":{}}")
736                .arg("--output-format")
737                .arg("json")
738                .stdin(std::process::Stdio::null())
739                .stdout(std::process::Stdio::piped())
740                .stderr(std::process::Stdio::piped());
741
742            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
743                Ok(c) => c,
744                Err(e) => {
745                    return PreflightOutcome::Error(AppError::Io(e));
746                }
747            };
748            let output = match wait_with_timeout(child, timeout) {
749                Ok(out) => out,
750                Err(e) => return PreflightOutcome::Error(e),
751            };
752            if !output.status.success() {
753                let stderr = String::from_utf8_lossy(&output.stderr);
754                if stderr.contains("hit your session limit")
755                    || stderr.contains("rate_limit")
756                    || stderr.contains("429")
757                {
758                    return PreflightOutcome::RateLimited {
759                        reason: stderr.trim().to_string(),
760                        suggestion:
761                            "wait for the OAuth window to reset or use --fallback-mode codex",
762                    };
763                }
764                return PreflightOutcome::Error(AppError::Validation(format!(
765                    "preflight probe failed: {stderr}",
766                    stderr = stderr.trim()
767                )));
768            }
769            PreflightOutcome::Healthy
770        }
771        EnrichMode::Codex => {
772            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
773                Ok(b) => b,
774                Err(e) => return PreflightOutcome::Error(e),
775            };
776            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
777                .map_err(PreflightOutcome::Error)
778                .ok();
779            let schema = "{}";
780            let schema_path = match super::codex_spawn::trusted_schema_path() {
781                Ok(p) => p,
782                Err(e) => return PreflightOutcome::Error(e),
783            };
784            let spawn_args = super::codex_spawn::CodexSpawnArgs {
785                binary: &bin,
786                prompt: "ping",
787                json_schema: schema,
788                input_text: "",
789                model: args.codex_model.as_deref(),
790                timeout_secs: args.rate_limit_buffer.max(60),
791                schema_path: schema_path.clone(),
792            };
793            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
794                Ok(c) => c,
795                Err(e) => return PreflightOutcome::Error(e),
796            };
797            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
798                Ok(c) => c,
799                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
800            };
801            let output = match wait_with_timeout(child, timeout) {
802                Ok(out) => out,
803                Err(e) => return PreflightOutcome::Error(e),
804            };
805            let _ = std::fs::remove_file(&schema_path);
806            if !output.status.success() {
807                let stderr = String::from_utf8_lossy(&output.stderr);
808                if stderr.contains("rate_limit")
809                    || stderr.contains("429")
810                    || stderr.contains("Too Many Requests")
811                {
812                    return PreflightOutcome::RateLimited {
813                        reason: stderr.trim().to_string(),
814                        suggestion: "wait for the rate-limit window to reset",
815                    };
816                }
817                return PreflightOutcome::Error(AppError::Validation(format!(
818                    "preflight probe failed: {stderr}",
819                    stderr = stderr.trim()
820                )));
821            }
822            PreflightOutcome::Healthy
823        }
824    }
825}
826
827/// Cross-platform wait with timeout (no extra crate dependency).
828fn wait_with_timeout(
829    mut child: std::process::Child,
830    timeout: std::time::Duration,
831) -> Result<std::process::Output, AppError> {
832    use wait_timeout::ChildExt;
833    let start = std::time::Instant::now();
834    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
835    if status.is_none() {
836        let _ = child.kill();
837        let _ = child.wait();
838        return Err(AppError::Validation(format!(
839            "preflight probe timed out after {}s",
840            start.elapsed().as_secs()
841        )));
842    }
843    let mut stdout = Vec::new();
844    if let Some(mut out) = child.stdout.take() {
845        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
846    }
847    let mut stderr = Vec::new();
848    if let Some(mut err) = child.stderr.take() {
849        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
850    }
851    let exit = status.unwrap();
852    Ok(std::process::Output {
853        status: exit,
854        stdout,
855        stderr,
856    })
857}
858
859// ---------------------------------------------------------------------------
860// SCAN helpers — SQL queries that find items needing enrichment
861// ---------------------------------------------------------------------------
862
863/// Returns memories without any `memory_entities` binding.
864///
865/// These are the targets for `memory-bindings` enrichment. When `name_filter`
866/// is non-empty, restricts the scan to the given names (G37); unknown names
867/// are silently skipped (the caller can detect them by comparing
868/// requested vs. returned).
869fn scan_unbound_memories(
870    conn: &Connection,
871    namespace: &str,
872    limit: Option<usize>,
873    name_filter: &[String],
874) -> Result<Vec<(i64, String, String)>, AppError> {
875    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
876
877    if name_filter.is_empty() {
878        let sql = format!(
879            "SELECT m.id, m.name, m.body
880             FROM memories m
881             WHERE m.namespace = ?1
882               AND m.deleted_at IS NULL
883               AND NOT EXISTS (
884                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
885               )
886             ORDER BY m.id
887             {limit_clause}"
888        );
889        let mut stmt = conn.prepare(&sql)?;
890        let rows = stmt
891            .query_map(rusqlite::params![namespace], |r| {
892                Ok((
893                    r.get::<_, i64>(0)?,
894                    r.get::<_, String>(1)?,
895                    r.get::<_, String>(2)?,
896                ))
897            })?
898            .collect::<Result<Vec<_>, _>>()?;
899        Ok(rows)
900    } else {
901        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
902        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
903            .map(|i| format!("?{i}"))
904            .collect();
905        let in_clause = placeholders.join(", ");
906        let sql = format!(
907            "SELECT m.id, m.name, m.body
908             FROM memories m
909             WHERE m.namespace = ?1
910               AND m.deleted_at IS NULL
911               AND m.name IN ({in_clause})
912               AND NOT EXISTS (
913                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
914               )
915             ORDER BY m.id
916             {limit_clause}"
917        );
918        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
919        params_vec.push(&namespace);
920        for n in name_filter {
921            params_vec.push(n);
922        }
923        let mut stmt = conn.prepare(&sql)?;
924        let rows = stmt
925            .query_map(
926                rusqlite::params_from_iter(params_vec.iter().copied()),
927                |r| {
928                    Ok((
929                        r.get::<_, i64>(0)?,
930                        r.get::<_, String>(1)?,
931                        r.get::<_, String>(2)?,
932                    ))
933                },
934            )?
935            .collect::<Result<Vec<_>, _>>()?;
936        Ok(rows)
937    }
938}
939
940/// Reads a list of memory names from a UTF-8 text file (G37).
941///
942/// Empty lines and lines beginning with `#` are skipped. Returns a
943/// de-duplicated, order-preserving list of trimmed names.
944fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
945    let content = std::fs::read_to_string(path).map_err(|e| {
946        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
947    })?;
948    let mut seen = std::collections::HashSet::new();
949    let mut out = Vec::new();
950    for line in content.lines() {
951        let trimmed = line.trim();
952        if trimmed.is_empty() || trimmed.starts_with('#') {
953            continue;
954        }
955        if seen.insert(trimmed.to_string()) {
956            out.push(trimmed.to_string());
957        }
958    }
959    Ok(out)
960}
961
962/// Resolves the union of `--names` and `--names-file` (G37).
963fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
964    let mut combined: Vec<String> = args.names.clone();
965    if let Some(p) = &args.names_file {
966        let from_file = read_names_file(p)?;
967        for n in from_file {
968            if !combined.contains(&n) {
969                combined.push(n);
970            }
971        }
972    }
973    Ok(combined)
974}
975
976/// Returns entities with NULL or empty description.
977///
978/// These are the targets for `entity-descriptions` enrichment.
979fn scan_entities_without_description(
980    conn: &Connection,
981    namespace: &str,
982    limit: Option<usize>,
983) -> Result<Vec<(i64, String, String)>, AppError> {
984    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
985    let sql = format!(
986        "SELECT id, name, type
987         FROM entities
988         WHERE namespace = ?1
989           AND (description IS NULL OR description = '')
990         ORDER BY id
991         {limit_clause}"
992    );
993    let mut stmt = conn.prepare(&sql)?;
994    let rows = stmt
995        .query_map(rusqlite::params![namespace], |r| {
996            Ok((
997                r.get::<_, i64>(0)?,
998                r.get::<_, String>(1)?,
999                r.get::<_, String>(2)?,
1000            ))
1001        })?
1002        .collect::<Result<Vec<_>, _>>()?;
1003    Ok(rows)
1004}
1005
1006/// Returns memories whose body length is below the configured minimum.
1007///
1008/// These are the targets for `body-enrich` (GAP-18).
1009fn scan_short_body_memories(
1010    conn: &Connection,
1011    namespace: &str,
1012    min_chars: usize,
1013    limit: Option<usize>,
1014) -> Result<Vec<(i64, String, String)>, AppError> {
1015    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1016    let sql = format!(
1017        "SELECT m.id, m.name, m.body
1018         FROM memories m
1019         WHERE m.namespace = ?1
1020           AND m.deleted_at IS NULL
1021           AND LENGTH(COALESCE(m.body,'')) < ?2
1022         ORDER BY m.id
1023         {limit_clause}"
1024    );
1025    let mut stmt = conn.prepare(&sql)?;
1026    let rows = stmt
1027        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1028            Ok((
1029                r.get::<_, i64>(0)?,
1030                r.get::<_, String>(1)?,
1031                r.get::<_, String>(2)?,
1032            ))
1033        })?
1034        .collect::<Result<Vec<_>, _>>()?;
1035    Ok(rows)
1036}
1037
1038/// Returns live memories that still have no row in `memory_embeddings`.
1039///
1040/// These are the targets for `re-embed`.
1041fn scan_memories_without_embeddings(
1042    conn: &Connection,
1043    namespace: &str,
1044    limit: Option<usize>,
1045    name_filter: &[String],
1046) -> Result<Vec<(i64, String, String)>, AppError> {
1047    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1048
1049    if name_filter.is_empty() {
1050        let sql = format!(
1051            "SELECT m.id, m.name, COALESCE(m.body,'')
1052             FROM memories m
1053             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1054             WHERE m.namespace = ?1
1055               AND m.deleted_at IS NULL
1056               AND me.memory_id IS NULL
1057             ORDER BY m.id
1058             {limit_clause}"
1059        );
1060        let mut stmt = conn.prepare(&sql)?;
1061        let rows = stmt
1062            .query_map(rusqlite::params![namespace], |r| {
1063                Ok((
1064                    r.get::<_, i64>(0)?,
1065                    r.get::<_, String>(1)?,
1066                    r.get::<_, String>(2)?,
1067                ))
1068            })?
1069            .collect::<Result<Vec<_>, _>>()?;
1070        Ok(rows)
1071    } else {
1072        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1073            .map(|i| format!("?{i}"))
1074            .collect();
1075        let in_clause = placeholders.join(", ");
1076        let sql = format!(
1077            "SELECT m.id, m.name, COALESCE(m.body,'')
1078             FROM memories m
1079             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1080             WHERE m.namespace = ?1
1081               AND m.deleted_at IS NULL
1082               AND m.name IN ({in_clause})
1083               AND me.memory_id IS NULL
1084             ORDER BY m.id
1085             {limit_clause}"
1086        );
1087        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1088        params_vec.push(&namespace);
1089        for n in name_filter {
1090            params_vec.push(n);
1091        }
1092        let mut stmt = conn.prepare(&sql)?;
1093        let rows = stmt
1094            .query_map(
1095                rusqlite::params_from_iter(params_vec.iter().copied()),
1096                |r| {
1097                    Ok((
1098                        r.get::<_, i64>(0)?,
1099                        r.get::<_, String>(1)?,
1100                        r.get::<_, String>(2)?,
1101                    ))
1102                },
1103            )?
1104            .collect::<Result<Vec<_>, _>>()?;
1105        Ok(rows)
1106    }
1107}
1108
1109/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1110#[allow(clippy::type_complexity)]
1111fn scan_weight_candidates(
1112    conn: &Connection,
1113    namespace: &str,
1114    limit: Option<usize>,
1115) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1116    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1117    let sql = format!(
1118        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1119         FROM relationships r \
1120         JOIN entities e1 ON e1.id = r.source_id \
1121         JOIN entities e2 ON e2.id = r.target_id \
1122         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1123         ORDER BY r.weight DESC {limit_clause}"
1124    );
1125    let mut stmt = conn.prepare(&sql)?;
1126    let rows = stmt
1127        .query_map(rusqlite::params![namespace], |r| {
1128            Ok((
1129                r.get::<_, i64>(0)?,
1130                r.get::<_, String>(1)?,
1131                r.get::<_, String>(2)?,
1132                r.get::<_, String>(3)?,
1133                r.get::<_, f64>(4)?,
1134            ))
1135        })?
1136        .collect::<Result<Vec<_>, _>>()?;
1137    Ok(rows)
1138}
1139
1140/// G27: Returns relationships with generic relation types (applies_to).
1141fn scan_generic_relations(
1142    conn: &Connection,
1143    namespace: &str,
1144    limit: Option<usize>,
1145) -> Result<Vec<(i64, String, String, String)>, AppError> {
1146    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1147    let sql = format!(
1148        "SELECT r.id, e1.name, e2.name, r.relation \
1149         FROM relationships r \
1150         JOIN entities e1 ON e1.id = r.source_id \
1151         JOIN entities e2 ON e2.id = r.target_id \
1152         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1153         ORDER BY r.id {limit_clause}"
1154    );
1155    let mut stmt = conn.prepare(&sql)?;
1156    let rows = stmt
1157        .query_map(rusqlite::params![namespace], |r| {
1158            Ok((
1159                r.get::<_, i64>(0)?,
1160                r.get::<_, String>(1)?,
1161                r.get::<_, String>(2)?,
1162                r.get::<_, String>(3)?,
1163            ))
1164        })?
1165        .collect::<Result<Vec<_>, _>>()?;
1166    Ok(rows)
1167}
1168
1169// ---------------------------------------------------------------------------
1170// PERSIST helpers for fully-implemented operations
1171// ---------------------------------------------------------------------------
1172
1173/// Persists entity bindings extracted by the LLM for a memory.
1174///
1175/// Creates entities via `upsert_entity`, links them to the memory via
1176/// `link_memory_entity`, and upserts relationships found between entities.
1177fn persist_memory_bindings(
1178    conn: &Connection,
1179    namespace: &str,
1180    memory_id: i64,
1181    entities_json: &serde_json::Value,
1182    rels_json: &serde_json::Value,
1183) -> Result<(usize, usize), AppError> {
1184    #[derive(Deserialize)]
1185    struct EntityItem {
1186        name: String,
1187        entity_type: String,
1188    }
1189    #[derive(Deserialize)]
1190    struct RelItem {
1191        source: String,
1192        target: String,
1193        relation: String,
1194        strength: f64,
1195    }
1196
1197    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1198        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1199
1200    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1201        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1202
1203    let mut ent_count = 0usize;
1204    let mut rel_count = 0usize;
1205
1206    for item in &extracted_entities {
1207        let entity_type = match item.entity_type.parse::<EntityType>() {
1208            Ok(et) => et,
1209            Err(_) => {
1210                tracing::warn!(
1211                    target: "enrich",
1212                    entity = %item.name,
1213                    entity_type = %item.entity_type,
1214                    "entity type not recognized, skipping"
1215                );
1216                continue;
1217            }
1218        };
1219        match entities::upsert_entity(
1220            conn,
1221            namespace,
1222            &NewEntity {
1223                name: item.name.clone(),
1224                entity_type,
1225                description: None,
1226            },
1227        ) {
1228            Ok(eid) => {
1229                let _ = entities::link_memory_entity(conn, memory_id, eid);
1230                ent_count += 1;
1231            }
1232            Err(e) => {
1233                tracing::warn!(
1234                    target: "enrich",
1235                    entity = %item.name,
1236                    error = %e,
1237                    "entity upsert skipped"
1238                );
1239            }
1240        }
1241    }
1242
1243    for rel in &extracted_rels {
1244        let normalized = crate::parsers::normalize_relation(&rel.relation);
1245        crate::parsers::warn_if_non_canonical(&normalized);
1246
1247        // Normalize entity names before lookup: upsert_entity normalizes on write,
1248        // so the lookup must use the same normalized form to find the row.
1249        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1250        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1251        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1252        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1253        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1254            let new_rel = NewRelationship {
1255                source: rel.source.clone(),
1256                target: rel.target.clone(),
1257                relation: normalized,
1258                strength: rel.strength,
1259                description: None,
1260            };
1261            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1262                rel_count += 1;
1263            }
1264        }
1265    }
1266
1267    Ok((ent_count, rel_count))
1268}
1269
1270/// Updates an entity's description directly in the `entities` table.
1271fn persist_entity_description(
1272    conn: &Connection,
1273    entity_id: i64,
1274    description: &str,
1275) -> Result<(), AppError> {
1276    conn.execute(
1277        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1278        rusqlite::params![description, entity_id],
1279    )?;
1280    Ok(())
1281}
1282
1283/// v1.0.84 (ADR-0042): on successful re-embed, records the active backend
1284/// into the shared accumulator (`ENRICH_LAST_BACKEND`) so the final
1285/// `EnrichSummary` can expose `backend_invoked` without changing every
1286/// caller's signature. Best-effort observability — concurrent enrich runs
1287/// may race, but `Mutex` keeps the mutation safe.
1288#[allow(clippy::too_many_arguments)]
1289fn reembed_memory_vector(
1290    conn: &Connection,
1291    namespace: &str,
1292    memory_id: i64,
1293    memory_name: &str,
1294    memory_type: &str,
1295    body: &str,
1296    paths: &crate::paths::AppPaths,
1297    llm_backend: crate::cli::LlmBackendChoice,
1298) -> Result<(), AppError> {
1299    let snippet: String = body.chars().take(200).collect();
1300    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
1301    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
1302    // backend que efetivamente rodou e popula o accumulator para o
1303    // EnrichSummary agregado.
1304    let (embedding, backend_kind) =
1305        crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1306    record_enrich_backend(backend_kind.as_str());
1307    memories::upsert_vec(
1308        conn,
1309        memory_id,
1310        namespace,
1311        memory_type,
1312        &embedding,
1313        memory_name,
1314        &snippet,
1315    )?;
1316    Ok(())
1317}
1318
1319/// v1.0.84 (ADR-0042): process-local accumulator of the last LLM backend
1320/// that successfully ran a re-embed during the current enrich invocation.
1321/// Read by `run` once at summary emission. Scoped to a single process —
1322/// cross-process enrichment is gated by the per-namespace singleton, so
1323/// there is no concurrency hazard across DBs.
1324fn record_enrich_backend(backend: &'static str) {
1325    if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1326        *guard = Some(backend);
1327    }
1328}
1329
1330fn take_enrich_backend() -> Option<&'static str> {
1331    ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1332}
1333
1334static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1335
1336/// Persists an enriched memory body (body-enrich, GAP-18).
1337///
1338/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1339/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1340fn persist_enriched_body(
1341    conn: &Connection,
1342    namespace: &str,
1343    memory_id: i64,
1344    memory_name: &str,
1345    new_body: &str,
1346    paths: &crate::paths::AppPaths,
1347    llm_backend: crate::cli::LlmBackendChoice,
1348) -> Result<(), AppError> {
1349    // Read current values for FTS sync
1350    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1351        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1352        rusqlite::params![memory_id],
1353        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1354    )?;
1355
1356    let memory_type: String = conn.query_row(
1357        "SELECT type FROM memories WHERE id=?1",
1358        rusqlite::params![memory_id],
1359        |r| r.get(0),
1360    )?;
1361
1362    let description: String = conn.query_row(
1363        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1364        rusqlite::params![memory_id],
1365        |r| r.get(0),
1366    )?;
1367
1368    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1369
1370    let new_memory = memories::NewMemory {
1371        namespace: namespace.to_string(),
1372        name: memory_name.to_string(),
1373        memory_type: memory_type.clone(),
1374        description: description.clone(),
1375        body: new_body.to_string(),
1376        body_hash,
1377        session_id: None,
1378        source: "agent".to_string(),
1379        metadata: serde_json::json!({
1380            "operation": "body-enrich",
1381            "orig_chars": old_body.chars().count(),
1382            "new_chars": new_body.chars().count(),
1383        }),
1384    };
1385
1386    // G29 audit: insert a new immutable version BEFORE the update so the
1387    // enriched body is reachable through `history --name <X>` and
1388    // `restore --version N` can roll back to the pre-enrich state.
1389    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1390    let version_metadata = serde_json::json!({
1391        "operation": "body-enrich",
1392        "orig_chars": old_body.chars().count(),
1393        "new_chars": new_body.chars().count(),
1394    })
1395    .to_string();
1396    crate::storage::versions::insert_version(
1397        conn,
1398        memory_id,
1399        next_version,
1400        memory_name,
1401        &memory_type,
1402        &description,
1403        new_body,
1404        &version_metadata,
1405        Some("enrich"),
1406        "edit",
1407    )?;
1408
1409    memories::update(conn, memory_id, &new_memory, None)?;
1410    memories::sync_fts_after_update(
1411        conn,
1412        memory_id,
1413        &old_name,
1414        &old_desc,
1415        &old_body,
1416        &new_memory.name,
1417        &new_memory.description,
1418        &new_memory.body,
1419    )?;
1420
1421    // Re-embed for recall accuracy
1422    if let Err(e) = reembed_memory_vector(
1423        conn,
1424        namespace,
1425        memory_id,
1426        memory_name,
1427        &memory_type,
1428        new_body,
1429        paths,
1430        llm_backend,
1431    ) {
1432        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1433    }
1434
1435    Ok(())
1436}
1437
1438// ---------------------------------------------------------------------------
1439// Main entry point
1440// ---------------------------------------------------------------------------
1441
1442// ---------------------------------------------------------------------------
1443// G20: mode-conditional flag validation
1444// ---------------------------------------------------------------------------
1445
1446/// True when a scalar value matches its declared default. Used to
1447/// distinguish "operator passed an explicit override" from "clap filled
1448/// the default" for flags with default_value_t.
1449fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1450    value == default
1451}
1452
1453/// G20: validate that flags for one LLM provider were not passed when
1454/// the operator selected a different provider. Flags silently discarded
1455/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1456/// DB work, so the operator gets an actionable error instead of a
1457/// surprise at runtime.
1458///
1459/// Detection rules:
1460/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1461/// - For scalar fields with default_value_t: value != default means explicit
1462/// - For boolean fields: true means explicit (default is false)
1463///
1464/// Mode-specific matrices:
1465/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1466/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1467fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1468    const DEFAULT_TIMEOUT: u64 = 300;
1469
1470    let mut conflicts: Vec<String> = Vec::new();
1471
1472    match args.mode {
1473        EnrichMode::ClaudeCode => {
1474            if args.codex_binary.is_some() {
1475                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1476            }
1477            if args.codex_model.is_some() {
1478                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1479            }
1480            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1481                conflicts.push(format!(
1482                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1483                    args.codex_timeout
1484                ));
1485            }
1486        }
1487        EnrichMode::Codex => {
1488            if args.claude_binary.is_some() {
1489                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1490            }
1491            if args.claude_model.is_some() {
1492                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1493            }
1494            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1495                conflicts.push(format!(
1496                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1497                    args.claude_timeout
1498                ));
1499            }
1500            if args.max_cost_usd.is_some() {
1501                conflicts.push(
1502                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1503                        .to_string(),
1504                );
1505            }
1506        }
1507    }
1508
1509    if !conflicts.is_empty() {
1510        return Err(AppError::Validation(format!(
1511            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1512            args.mode,
1513            conflicts.join("\n  - ")
1514        )));
1515    }
1516
1517    Ok(())
1518}
1519
1520// ---------------------------------------------------------------------------
1521
1522/// Main entry point for the `enrich` command.
1523pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1524    // G20: mode-conditional flag validation BEFORE any DB access.
1525    // Surfaces flags that the wrong mode would silently discard.
1526    validate_mode_conditional_flags_enrich(args)?;
1527    let started = Instant::now();
1528
1529    let paths = AppPaths::resolve(args.db.as_deref())?;
1530    ensure_db_ready(&paths)?;
1531    let conn = open_rw(&paths.db)?;
1532    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1533
1534    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1535    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1536    // on the same DB cannot co-exist, but concurrent enrich on different
1537    // databases works as expected. The force flag (--force) breaks a
1538    // stale lock from a previously crashed invocation.
1539    let wait_secs = args.wait_job_singleton;
1540    let force_flag = args.force_job_singleton;
1541    let _singleton = crate::lock::acquire_job_singleton(
1542        crate::lock::JobType::Enrich,
1543        &namespace,
1544        &paths.db,
1545        wait_secs,
1546        force_flag,
1547    )?;
1548
1549    // Validate provider binary upfront only for LLM-backed operations.
1550    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1551        None
1552    } else {
1553        Some(match args.mode {
1554            EnrichMode::ClaudeCode => {
1555                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1556                let version = super::claude_runner::validate_claude_version(&bin)?;
1557                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1558                emit_json(&PhaseEvent {
1559                    phase: "validate",
1560                    binary_path: bin.to_str(),
1561                    version: Some(&version),
1562                    items_total: None,
1563                    items_pending: None,
1564                    llm_parallelism: None,
1565                });
1566                bin
1567            }
1568            EnrichMode::Codex => {
1569                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1570                emit_json(&PhaseEvent {
1571                    phase: "validate",
1572                    binary_path: bin.to_str(),
1573                    version: None,
1574                    items_total: None,
1575                    items_pending: None,
1576                    llm_parallelism: None,
1577                });
1578                bin
1579            }
1580        })
1581    };
1582
1583    // G28-D: refuse to start when the system is saturated. This check
1584    // is BEFORE preflight so we never spend an OAuth turn on a host
1585    // that is already at the limit.
1586    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1587        let load = crate::system_load::load_average_one();
1588        let n = crate::system_load::ncpus();
1589        return Err(AppError::Validation(format!(
1590            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1591             pass --no-max-load-check to override (not recommended)"
1592        )));
1593    }
1594
1595    // G35: preflight probe — issue a single ping turn to verify the
1596    // provider is healthy before scanning N candidates. If the probe
1597    // fails with a rate-limit error, optionally fall back to a
1598    // different mode (typically codex) instead of failing the entire
1599    // batch. The probe itself consumes 1 OAuth turn, so it stays
1600    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1601    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1602    {
1603        let preflight_result = run_preflight_probe(args);
1604        match preflight_result {
1605            PreflightOutcome::Healthy => {
1606                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1607            }
1608            PreflightOutcome::RateLimited { reason, suggestion } => {
1609                if let Some(fallback) = args.fallback_mode.clone() {
1610                    if fallback != args.mode {
1611                        // G35 (v1.0.69): the mid-batch mode switch is
1612                        // intentionally NOT applied because it would
1613                        // desynchronise the per-item rate-limit wait
1614                        // state (rate-limited items in the worker are
1615                        // timed against the original provider). Instead
1616                        // we abort cleanly so the operator can re-invoke
1617                        // with `--mode {fallback:?}`. This guarantees no
1618                        // OAuth window is wasted and no partial state
1619                        // is left in the queue.
1620                        return Err(AppError::Validation(format!(
1621                            "preflight detected rate limit on {mode:?}: {reason}; \
1622                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1623                            mode = args.mode
1624                        )));
1625                    }
1626                    return Err(AppError::Validation(format!(
1627                        "preflight detected rate limit on {mode:?}: {reason}; \
1628                         --fallback-mode matches --mode, no recovery possible",
1629                        mode = args.mode
1630                    )));
1631                }
1632                return Err(AppError::Validation(format!(
1633                    "preflight detected rate limit on {mode:?}: {reason}; \
1634                     {suggestion}; pass --fallback-mode codex to recover",
1635                    mode = args.mode
1636                )));
1637            }
1638            PreflightOutcome::Error(e) => {
1639                return Err(e);
1640            }
1641        }
1642    }
1643
1644    // SCAN phase
1645    let scan_result = scan_operation(&conn, &namespace, args)?;
1646    let total = scan_result.len();
1647
1648    emit_json(&PhaseEvent {
1649        phase: "scan",
1650        binary_path: None,
1651        version: None,
1652        items_total: Some(total),
1653        items_pending: Some(total),
1654        llm_parallelism: Some(args.llm_parallelism),
1655    });
1656
1657    // Dry-run: emit preview events and summary without calling LLM
1658    if args.dry_run {
1659        for (idx, key) in scan_result.iter().enumerate() {
1660            emit_json(&ItemEvent {
1661                item: key,
1662                status: "preview",
1663                memory_id: None,
1664                entity_id: None,
1665                entities: None,
1666                rels: None,
1667                chars_before: None,
1668                chars_after: None,
1669                cost_usd: None,
1670                elapsed_ms: None,
1671                error: None,
1672                index: idx,
1673                total,
1674            });
1675        }
1676        emit_json(&EnrichSummary {
1677            summary: true,
1678            operation: format!("{:?}", args.operation),
1679            items_total: total,
1680            completed: 0,
1681            failed: 0,
1682            skipped: 0,
1683            cost_usd: 0.0,
1684            elapsed_ms: started.elapsed().as_millis() as u64,
1685            backend_invoked: take_enrich_backend(),
1686        });
1687        return Ok(());
1688    }
1689
1690    // All operations in this enum have an execution path.
1691
1692    // Queue setup for resume/retry
1693    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1694
1695    if args.resume {
1696        let reset = queue_conn
1697            .execute(
1698                "UPDATE queue SET status='pending' WHERE status='processing'",
1699                [],
1700            )
1701            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1702        if reset > 0 {
1703            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1704        }
1705    }
1706
1707    if args.retry_failed {
1708        let count = queue_conn
1709            .execute(
1710                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1711                [],
1712            )
1713            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1714        tracing::info!(target: "enrich", count, "retrying failed items");
1715    }
1716
1717    if !args.resume && !args.retry_failed {
1718        queue_conn
1719            .execute("DELETE FROM queue", [])
1720            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1721    }
1722
1723    // Populate queue
1724    for (idx, key) in scan_result.iter().enumerate() {
1725        let item_type = match args.operation {
1726            EnrichOperation::EntityDescriptions => "entity",
1727            _ => "memory",
1728        };
1729        if let Err(e) = queue_conn.execute(
1730            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1731            rusqlite::params![key, item_type],
1732        ) {
1733            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1734        }
1735        let _ = idx; // suppress unused warning
1736    }
1737
1738    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1739    // Clamp enforces the range even if the caller bypasses clap validation.
1740    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1741    if parallelism > 1 {
1742        tracing::info!(
1743            target: "enrich",
1744            llm_parallelism = parallelism,
1745            "parallel LLM processing with bounded thread pool"
1746        );
1747    }
1748    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1749    // ceiling. The threshold and message depend on the LLM mode because
1750    // Claude Code spawns MCP children (G28-A) while Codex does not.
1751    if parallelism > 4 {
1752        match args.mode {
1753            EnrichMode::ClaudeCode => {
1754                tracing::warn!(
1755                    target: "enrich",
1756                    llm_parallelism = parallelism,
1757                    recommended_max = 4,
1758                    mode = "claude-code",
1759                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1760                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1761                     to cut MCP children (G28-A)"
1762                );
1763            }
1764            EnrichMode::Codex if parallelism > 16 => {
1765                tracing::warn!(
1766                    target: "enrich",
1767                    llm_parallelism = parallelism,
1768                    recommended_max = 16,
1769                    mode = "codex",
1770                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1771                     consider --llm-parallelism 8 for safer concurrency"
1772                );
1773            }
1774            EnrichMode::Codex => {
1775                // No warning: codex does not spawn MCP children and was
1776                // validated at parallelism 8 in production (1161 items,
1777                // 0 failures) per the 2026-06-04 session audit.
1778            }
1779        }
1780    }
1781
1782    let mut completed = 0usize;
1783    let mut failed = 0usize;
1784    let mut skipped = 0usize;
1785    let mut cost_total = 0.0f64;
1786    let mut oauth_detected = false;
1787    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1788    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1789    let enrich_started = std::time::Instant::now();
1790
1791    let provider_timeout = match args.mode {
1792        EnrichMode::ClaudeCode => args.claude_timeout,
1793        EnrichMode::Codex => args.codex_timeout,
1794    };
1795
1796    let provider_model: Option<&str> = match args.mode {
1797        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1798        EnrichMode::Codex => args.codex_model.as_deref(),
1799    };
1800
1801    // G19: when parallelism > 1, spawn bounded worker threads.
1802    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1803    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1804    if parallelism > 1 {
1805        let stdout_mu = parking_lot::Mutex::new(());
1806        let budget = args.max_cost_usd;
1807        let operation = args.operation.clone();
1808        let mode = args.mode.clone();
1809        let min_oc = args.min_output_chars;
1810        let max_oc = args.max_output_chars;
1811        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1812
1813        struct WorkerResult {
1814            completed: usize,
1815            failed: usize,
1816            skipped: usize,
1817            cost: f64,
1818            oauth: bool,
1819        }
1820
1821        let results: Vec<WorkerResult> = std::thread::scope(|s| {
1822            let handles: Vec<_> = (0..parallelism)
1823                .map(|worker_id| {
1824                    let stdout_mu = &stdout_mu;
1825                    let paths = &paths;
1826                    let namespace = &namespace;
1827                    let provider_binary = provider_binary.as_deref();
1828                    let operation = &operation;
1829                    let mode = &mode;
1830                    let prompt_tpl = prompt_tpl.as_deref();
1831                    s.spawn(move || {
1832                        let w_conn = match open_rw(&paths.db) {
1833                            Ok(c) => c,
1834                            Err(e) => {
1835                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1836                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1837                            }
1838                        };
1839                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1840                            Ok(c) => c,
1841                            Err(e) => {
1842                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1843                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1844                            }
1845                        };
1846                        let mut w_completed = 0usize;
1847                        let mut w_failed = 0usize;
1848                        let mut w_skipped = 0usize;
1849                        let mut w_cost = 0.0f64;
1850                        let mut w_oauth = false;
1851                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1852                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1853                        // G28-D: per-worker circuit breaker that aborts the
1854                        // loop after `circuit_breaker_threshold` consecutive
1855                        // HardFailure outcomes (transient/rate-limited errors
1856                        // do NOT count, so a recovering provider is not
1857                        // penalised).
1858                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1859                            args.circuit_breaker_threshold.max(1),
1860                            std::time::Duration::from_secs(60),
1861                        );
1862
1863                        loop {
1864                            if crate::shutdown_requested() {
1865                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1866                                break;
1867                            }
1868                            if let Some(b) = budget {
1869                                if !w_oauth && w_cost >= b {
1870                                    break;
1871                                }
1872                            }
1873                            let pending: Option<(i64, String, String)> = w_queue
1874                                .query_row(
1875                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1876                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1877                                     RETURNING id, item_key, item_type",
1878                                    [],
1879                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1880                                )
1881                                .ok();
1882                            let (queue_id, item_key, _item_type) = match pending {
1883                                Some(p) => p,
1884                                None => break,
1885                            };
1886                            let item_started = Instant::now();
1887                            let current_index = w_completed + w_failed + w_skipped;
1888
1889                            let call_result = match operation {
1890                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1891                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1892                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
1893                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
1894                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1895                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1896                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1897                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1898                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1899                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1900                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1901                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1902                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1903                            };
1904
1905                            match call_result {
1906                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1907                                    if is_oauth { w_oauth = true; }
1908                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1909                                    let _ = w_queue.execute(
1910                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1911                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1912                                    );
1913                                    w_completed += 1;
1914                                    if !is_oauth { w_cost += cost; }
1915                                    // G28-D: count success; resets breaker.
1916                                    let _ = w_breaker
1917                                        .record(crate::retry::AttemptOutcome::Success);
1918                                    let _guard = stdout_mu.lock();
1919                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1920                                }
1921                                Ok(EnrichItemResult::Skipped { reason }) => {
1922                                    w_skipped += 1;
1923                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1924                                    let _guard = stdout_mu.lock();
1925                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1926                                }
1927                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1928                                    // G29 Passo 4: worker mirror of the
1929                                    // serial path. Counted as a soft
1930                                    // skip so the queue surface shows
1931                                    // a quality issue rather than a
1932                                    // transport failure.
1933                                    w_skipped += 1;
1934                                    let reason = format!(
1935                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1936                                    );
1937                                    let _ = w_queue.execute(
1938                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1939                                        rusqlite::params![reason, queue_id],
1940                                    );
1941                                    let _guard = stdout_mu.lock();
1942                                    emit_json(&ItemEvent {
1943                                        item: &item_key,
1944                                        status: "preservation_failed",
1945                                        memory_id: None,
1946                                        entity_id: None,
1947                                        entities: None,
1948                                        rels: None,
1949                                        chars_before: Some(chars_before),
1950                                        chars_after: Some(chars_after),
1951                                        cost_usd: None,
1952                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1953                                        error: Some(reason),
1954                                        index: current_index,
1955                                        total,
1956                                    });
1957                                }
1958                                Err(e) => {
1959                                    let err_str = format!("{e}");
1960                                    if matches!(e, AppError::RateLimited { .. }) {
1961                                        if crate::retry::is_kill_switch_active() {
1962                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1963                                        } else if std::time::Instant::now() >= w_deadline {
1964                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1965                                        } else {
1966                                            let half = w_backoff / 2;
1967                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1968                                            let actual_wait = half + jitter;
1969                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1970                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1971                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1972                                            w_backoff = (w_backoff * 2).min(900);
1973                                            continue;
1974                                        }
1975                                    }
1976                                    w_failed += 1;
1977                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1978                                    let _guard = stdout_mu.lock();
1979                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1980                                    // G28-D: count hard failure against breaker.
1981                                    let breaker_opened = w_breaker
1982                                        .record(crate::retry::AttemptOutcome::HardFailure);
1983                                    if breaker_opened {
1984                                        tracing::error!(target: "enrich",
1985                                            consecutive_failures = w_breaker.consecutive_failures(),
1986                                            "circuit breaker opened — aborting worker"
1987                                        );
1988                                        break;
1989                                    }
1990                                }
1991                            }
1992                        }
1993                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1994                    })
1995                })
1996                .collect();
1997            handles
1998                .into_iter()
1999                .map(|h| {
2000                    h.join().unwrap_or(WorkerResult {
2001                        completed: 0,
2002                        failed: 0,
2003                        skipped: 0,
2004                        cost: 0.0,
2005                        oauth: false,
2006                    })
2007                })
2008                .collect()
2009        });
2010
2011        for r in &results {
2012            completed += r.completed;
2013            failed += r.failed;
2014            skipped += r.skipped;
2015            cost_total += r.cost;
2016            if r.oauth && !oauth_detected {
2017                oauth_detected = true;
2018            }
2019        }
2020    } else {
2021        // Serial path (parallelism == 1) — original loop
2022        loop {
2023            if crate::shutdown_requested() {
2024                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2025                break;
2026            }
2027
2028            // Budget check
2029            if let Some(budget) = args.max_cost_usd {
2030                if !oauth_detected && cost_total >= budget {
2031                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2032                    break;
2033                }
2034            }
2035
2036            // Dequeue next pending item
2037            let pending: Option<(i64, String, String)> = queue_conn
2038                .query_row(
2039                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2040                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2041                 RETURNING id, item_key, item_type",
2042                    [],
2043                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2044                )
2045                .ok();
2046
2047            let (queue_id, item_key, item_type) = match pending {
2048                Some(p) => p,
2049                None => break,
2050            };
2051
2052            let item_started = Instant::now();
2053            let current_index = completed + failed + skipped;
2054
2055            let call_result = match args.operation {
2056                EnrichOperation::MemoryBindings => call_memory_bindings(
2057                    &conn,
2058                    &namespace,
2059                    &item_key,
2060                    provider_binary
2061                        .as_deref()
2062                        .expect("provider binary required"),
2063                    provider_model,
2064                    provider_timeout,
2065                    &args.mode,
2066                ),
2067                EnrichOperation::EntityDescriptions => call_entity_description(
2068                    &conn,
2069                    &namespace,
2070                    &item_key,
2071                    provider_binary
2072                        .as_deref()
2073                        .expect("provider binary required"),
2074                    provider_model,
2075                    provider_timeout,
2076                    &args.mode,
2077                ),
2078                EnrichOperation::BodyEnrich => call_body_enrich(
2079                    &conn,
2080                    &namespace,
2081                    &item_key,
2082                    provider_binary
2083                        .as_deref()
2084                        .expect("provider binary required"),
2085                    provider_model,
2086                    provider_timeout,
2087                    &args.mode,
2088                    args.min_output_chars,
2089                    args.max_output_chars,
2090                    args.prompt_template.as_deref(),
2091                    args.preserve_threshold,
2092                    &paths,
2093                    llm_backend,
2094                ),
2095                EnrichOperation::ReEmbed => {
2096                    call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2097                }
2098                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2099                    &conn,
2100                    &namespace,
2101                    &item_key,
2102                    provider_binary
2103                        .as_deref()
2104                        .expect("provider binary required"),
2105                    provider_model,
2106                    provider_timeout,
2107                    &args.mode,
2108                ),
2109                EnrichOperation::RelationReclassify => call_relation_reclassify(
2110                    &conn,
2111                    &namespace,
2112                    &item_key,
2113                    provider_binary
2114                        .as_deref()
2115                        .expect("provider binary required"),
2116                    provider_model,
2117                    provider_timeout,
2118                    &args.mode,
2119                ),
2120                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2121                    call_entity_connect(
2122                        &conn,
2123                        &namespace,
2124                        &item_key,
2125                        provider_binary
2126                            .as_deref()
2127                            .expect("provider binary required"),
2128                        provider_model,
2129                        provider_timeout,
2130                        &args.mode,
2131                    )
2132                }
2133                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2134                    &conn,
2135                    &namespace,
2136                    &item_key,
2137                    provider_binary
2138                        .as_deref()
2139                        .expect("provider binary required"),
2140                    provider_model,
2141                    provider_timeout,
2142                    &args.mode,
2143                ),
2144                EnrichOperation::DescriptionEnrich => call_description_enrich(
2145                    &conn,
2146                    &namespace,
2147                    &item_key,
2148                    provider_binary
2149                        .as_deref()
2150                        .expect("provider binary required"),
2151                    provider_model,
2152                    provider_timeout,
2153                    &args.mode,
2154                ),
2155                EnrichOperation::DomainClassify => call_domain_classify(
2156                    &conn,
2157                    &namespace,
2158                    &item_key,
2159                    provider_binary
2160                        .as_deref()
2161                        .expect("provider binary required"),
2162                    provider_model,
2163                    provider_timeout,
2164                    &args.mode,
2165                ),
2166                EnrichOperation::GraphAudit => call_graph_audit(
2167                    &conn,
2168                    &namespace,
2169                    &item_key,
2170                    provider_binary
2171                        .as_deref()
2172                        .expect("provider binary required"),
2173                    provider_model,
2174                    provider_timeout,
2175                    &args.mode,
2176                ),
2177                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2178                    &conn,
2179                    &namespace,
2180                    &item_key,
2181                    provider_binary
2182                        .as_deref()
2183                        .expect("provider binary required"),
2184                    provider_model,
2185                    provider_timeout,
2186                    &args.mode,
2187                ),
2188                EnrichOperation::BodyExtract => call_body_extract(
2189                    &conn,
2190                    &namespace,
2191                    &item_key,
2192                    provider_binary
2193                        .as_deref()
2194                        .expect("provider binary required"),
2195                    provider_model,
2196                    provider_timeout,
2197                    &args.mode,
2198                ),
2199            };
2200
2201            match call_result {
2202                Ok(EnrichItemResult::Done {
2203                    memory_id,
2204                    entity_id,
2205                    entities,
2206                    rels,
2207                    chars_before,
2208                    chars_after,
2209                    cost,
2210                    is_oauth,
2211                }) => {
2212                    if is_oauth && !oauth_detected {
2213                        oauth_detected = true;
2214                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2215                    }
2216                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2217
2218                    // Persist depends on the operation
2219                    let persist_err: Option<String> = match args.operation {
2220                        EnrichOperation::MemoryBindings => {
2221                            // Bindings already persisted inside call_memory_bindings
2222                            None
2223                        }
2224                        EnrichOperation::EntityDescriptions => {
2225                            // Description already persisted inside call_entity_description
2226                            None
2227                        }
2228                        EnrichOperation::BodyEnrich => {
2229                            // Body already persisted inside call_body_enrich
2230                            None
2231                        }
2232                        _ => {
2233                            // All G27 operations persist inside their call_* function
2234                            None
2235                        }
2236                    };
2237
2238                    if let Err(e) = queue_conn.execute(
2239                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2240                    rusqlite::params![
2241                        memory_id,
2242                        entity_id,
2243                        entities as i64,
2244                        rels as i64,
2245                        cost,
2246                        item_started.elapsed().as_millis() as i64,
2247                        queue_id
2248                    ],
2249                ) {
2250                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2251                    }
2252
2253                    if persist_err.is_none() {
2254                        completed += 1;
2255                        if !is_oauth {
2256                            cost_total += cost;
2257                        }
2258                        emit_json(&ItemEvent {
2259                            item: &item_key,
2260                            status: "done",
2261                            memory_id,
2262                            entity_id,
2263                            entities: Some(entities),
2264                            rels: Some(rels),
2265                            chars_before,
2266                            chars_after,
2267                            cost_usd: if is_oauth { None } else { Some(cost) },
2268                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2269                            error: None,
2270                            index: current_index,
2271                            total,
2272                        });
2273                    } else {
2274                        failed += 1;
2275                        emit_json(&ItemEvent {
2276                            item: &item_key,
2277                            status: "failed",
2278                            memory_id: None,
2279                            entity_id: None,
2280                            entities: None,
2281                            rels: None,
2282                            chars_before: None,
2283                            chars_after: None,
2284                            cost_usd: None,
2285                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2286                            error: persist_err,
2287                            index: current_index,
2288                            total,
2289                        });
2290                    }
2291                }
2292                Ok(EnrichItemResult::Skipped { reason }) => {
2293                    skipped += 1;
2294                    if let Err(e) = queue_conn.execute(
2295                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2296                    rusqlite::params![reason, queue_id],
2297                ) {
2298                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2299                    }
2300                    emit_json(&ItemEvent {
2301                        item: &item_key,
2302                        status: "skipped",
2303                        memory_id: None,
2304                        entity_id: None,
2305                        entities: None,
2306                        rels: None,
2307                        chars_before: None,
2308                        chars_after: None,
2309                        cost_usd: None,
2310                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2311                        error: None,
2312                        index: current_index,
2313                        total,
2314                    });
2315                }
2316                Ok(EnrichItemResult::PreservationFailed {
2317                    score,
2318                    threshold,
2319                    chars_before,
2320                    chars_after,
2321                }) => {
2322                    // G29 Passo 4: the LLM rewrite diverged too far from
2323                    // the original body. Count as a soft failure (not
2324                    // `failed`) so the queue surfaces it as a quality
2325                    // issue, not a transport error. The reason is
2326                    // structured so the operator can audit why a body
2327                    // was rejected.
2328                    skipped += 1;
2329                    let reason = format!(
2330                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2331                    );
2332                    if let Err(qe) = queue_conn.execute(
2333                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2334                        rusqlite::params![reason, queue_id],
2335                    ) {
2336                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2337                    }
2338                    emit_json(&ItemEvent {
2339                        item: &item_key,
2340                        status: "preservation_failed",
2341                        memory_id: None,
2342                        entity_id: None,
2343                        entities: None,
2344                        rels: None,
2345                        chars_before: Some(chars_before),
2346                        chars_after: Some(chars_after),
2347                        cost_usd: None,
2348                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2349                        error: Some(reason),
2350                        index: current_index,
2351                        total,
2352                    });
2353                }
2354                Err(e) => {
2355                    let err_str = format!("{e}");
2356                    if matches!(e, AppError::RateLimited { .. }) {
2357                        if crate::retry::is_kill_switch_active() {
2358                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2359                        } else if std::time::Instant::now() >= rate_limit_deadline {
2360                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2361                        } else {
2362                            let half = backoff_secs / 2;
2363                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2364                            let actual_wait = half + jitter;
2365                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2366                            if let Err(qe) = queue_conn.execute(
2367                                "UPDATE queue SET status='pending' WHERE id=?1",
2368                                rusqlite::params![queue_id],
2369                            ) {
2370                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2371                            }
2372                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2373                            backoff_secs = (backoff_secs * 2).min(900);
2374                            continue;
2375                        }
2376                    }
2377
2378                    failed += 1;
2379                    if let Err(qe) = queue_conn.execute(
2380                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2381                    rusqlite::params![err_str, queue_id],
2382                ) {
2383                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2384                    }
2385                    emit_json(&ItemEvent {
2386                        item: &item_key,
2387                        status: "failed",
2388                        memory_id: None,
2389                        entity_id: None,
2390                        entities: None,
2391                        rels: None,
2392                        chars_before: None,
2393                        chars_after: None,
2394                        cost_usd: None,
2395                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2396                        error: Some(err_str),
2397                        index: current_index,
2398                        total,
2399                    });
2400                }
2401            }
2402
2403            let _ = item_type; // used via queue schema only
2404        }
2405    } // end else (serial path)
2406
2407    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2408    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2409
2410    emit_json(&EnrichSummary {
2411        summary: true,
2412        operation: format!("{:?}", args.operation),
2413        items_total: total,
2414        completed,
2415        failed,
2416        skipped,
2417        cost_usd: cost_total,
2418        elapsed_ms: started.elapsed().as_millis() as u64,
2419        backend_invoked: take_enrich_backend(),
2420    });
2421
2422    if failed == 0 {
2423        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2424    }
2425
2426    Ok(())
2427}
2428
2429// ---------------------------------------------------------------------------
2430// Internal result type for a single item call
2431// ---------------------------------------------------------------------------
2432
2433enum EnrichItemResult {
2434    Done {
2435        memory_id: Option<i64>,
2436        entity_id: Option<i64>,
2437        entities: usize,
2438        rels: usize,
2439        chars_before: Option<usize>,
2440        chars_after: Option<usize>,
2441        cost: f64,
2442        is_oauth: bool,
2443    },
2444    Skipped {
2445        reason: String,
2446    },
2447    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2448    /// body beyond the configured `--preserve-threshold` and was rejected
2449    /// before persistence. The trigram-Jaccard score and threshold are
2450    /// emitted in the NDJSON stream for operator audit.
2451    PreservationFailed {
2452        score: f64,
2453        threshold: f64,
2454        chars_before: usize,
2455        chars_after: usize,
2456    },
2457}
2458
2459// ---------------------------------------------------------------------------
2460// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2461// ---------------------------------------------------------------------------
2462
2463fn call_memory_bindings(
2464    conn: &Connection,
2465    namespace: &str,
2466    memory_name: &str,
2467    binary: &Path,
2468    model: Option<&str>,
2469    timeout: u64,
2470    mode: &EnrichMode,
2471) -> Result<EnrichItemResult, AppError> {
2472    // Look up the memory
2473    let (memory_id, body): (i64, String) = conn.query_row(
2474        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2475        rusqlite::params![namespace, memory_name],
2476        |r| Ok((r.get(0)?, r.get(1)?)),
2477    ).map_err(|e| match e {
2478        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2479        other => AppError::Database(other),
2480    })?;
2481
2482    if body.trim().is_empty() {
2483        return Ok(EnrichItemResult::Skipped {
2484            reason: "body is empty".to_string(),
2485        });
2486    }
2487
2488    let (value, cost, is_oauth) = match mode {
2489        EnrichMode::ClaudeCode => call_claude(
2490            binary,
2491            BINDINGS_PROMPT,
2492            BINDINGS_SCHEMA,
2493            &body,
2494            model,
2495            timeout,
2496        )?,
2497        EnrichMode::Codex => call_codex(
2498            binary,
2499            BINDINGS_PROMPT,
2500            BINDINGS_SCHEMA,
2501            &body,
2502            model,
2503            timeout,
2504        )?,
2505    };
2506
2507    let empty_arr = serde_json::Value::Array(vec![]);
2508    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2509    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2510
2511    let (ent_count, rel_count) =
2512        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2513
2514    Ok(EnrichItemResult::Done {
2515        memory_id: Some(memory_id),
2516        entity_id: None,
2517        entities: ent_count,
2518        rels: rel_count,
2519        chars_before: None,
2520        chars_after: None,
2521        cost,
2522        is_oauth,
2523    })
2524}
2525
2526fn call_entity_description(
2527    conn: &Connection,
2528    namespace: &str,
2529    entity_name: &str,
2530    binary: &Path,
2531    model: Option<&str>,
2532    timeout: u64,
2533    mode: &EnrichMode,
2534) -> Result<EnrichItemResult, AppError> {
2535    let (entity_id, entity_type): (i64, String) = conn
2536        .query_row(
2537            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2538            rusqlite::params![namespace, entity_name],
2539            |r| Ok((r.get(0)?, r.get(1)?)),
2540        )
2541        .map_err(|e| match e {
2542            rusqlite::Error::QueryReturnedNoRows => {
2543                AppError::NotFound(format!("entity '{entity_name}' not found"))
2544            }
2545            other => AppError::Database(other),
2546        })?;
2547
2548    let prompt = format!(
2549        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2550    );
2551
2552    let (value, cost, is_oauth) = match mode {
2553        EnrichMode::ClaudeCode => call_claude(
2554            binary,
2555            &prompt,
2556            ENTITY_DESCRIPTION_SCHEMA,
2557            "",
2558            model,
2559            timeout,
2560        )?,
2561        EnrichMode::Codex => call_codex(
2562            binary,
2563            &prompt,
2564            ENTITY_DESCRIPTION_SCHEMA,
2565            "",
2566            model,
2567            timeout,
2568        )?,
2569    };
2570
2571    let description = value
2572        .get("description")
2573        .and_then(|v| v.as_str())
2574        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2575
2576    persist_entity_description(conn, entity_id, description)?;
2577
2578    Ok(EnrichItemResult::Done {
2579        memory_id: None,
2580        entity_id: Some(entity_id),
2581        entities: 0,
2582        rels: 0,
2583        chars_before: None,
2584        chars_after: None,
2585        cost,
2586        is_oauth,
2587    })
2588}
2589
2590#[allow(clippy::too_many_arguments)]
2591fn call_body_enrich(
2592    conn: &Connection,
2593    namespace: &str,
2594    memory_name: &str,
2595    binary: &Path,
2596    model: Option<&str>,
2597    timeout: u64,
2598    mode: &EnrichMode,
2599    min_output_chars: usize,
2600    max_output_chars: usize,
2601    prompt_template: Option<&Path>,
2602    preserve_threshold: f64,
2603    paths: &crate::paths::AppPaths,
2604    llm_backend: crate::cli::LlmBackendChoice,
2605) -> Result<EnrichItemResult, AppError> {
2606    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2607        .query_row(
2608            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2609         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2610            rusqlite::params![namespace, memory_name],
2611            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2612        )
2613        .map_err(|e| match e {
2614            rusqlite::Error::QueryReturnedNoRows => {
2615                AppError::NotFound(format!("memory '{memory_name}' not found"))
2616            }
2617            other => AppError::Database(other),
2618        })?;
2619
2620    let chars_before = body.chars().count();
2621
2622    // G26: gather graph context for contextualized enrichment
2623    let linked_entities: Vec<String> = {
2624        let mut stmt = conn.prepare_cached(
2625            "SELECT e.name FROM memory_entities me \
2626             JOIN entities e ON e.id = me.entity_id \
2627             WHERE me.memory_id = ?1 LIMIT 10",
2628        )?;
2629        let result: Vec<String> = stmt
2630            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2631            .filter_map(|r| r.ok())
2632            .collect();
2633        drop(stmt);
2634        result
2635    };
2636
2637    // Load custom prompt template if provided
2638    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2639        let file_size = std::fs::metadata(tmpl_path)
2640            .map_err(|e| {
2641                AppError::Io(std::io::Error::new(
2642                    e.kind(),
2643                    format!("failed to stat prompt template: {e}"),
2644                ))
2645            })?
2646            .len();
2647        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2648            return Err(AppError::LimitExceeded(
2649                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2650            ));
2651        }
2652        std::fs::read_to_string(tmpl_path).map_err(|e| {
2653            AppError::Io(std::io::Error::new(
2654                e.kind(),
2655                format!("failed to read prompt template: {e}"),
2656            ))
2657        })?
2658    } else {
2659        BODY_ENRICH_PROMPT_PREFIX.to_string()
2660    };
2661
2662    // G26: build contextualized prompt with graph data
2663    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2664        let mut ctx = String::new();
2665        ctx.push_str(&format!(
2666            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2667        ));
2668        if !description.is_empty() {
2669            ctx.push_str(&format!("- Description: {description}\n"));
2670        }
2671        ctx.push_str(&format!("- Domain: {namespace}\n"));
2672        if !linked_entities.is_empty() {
2673            ctx.push_str(&format!(
2674                "- Linked entities: {}\n",
2675                linked_entities.join(", ")
2676            ));
2677        }
2678        ctx
2679    } else {
2680        String::new()
2681    };
2682
2683    let prompt = format!(
2684        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2685    );
2686
2687    // The body schema uses a free-form enriched_body field
2688    let (value, cost, is_oauth) = match mode {
2689        EnrichMode::ClaudeCode => {
2690            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2691        }
2692        EnrichMode::Codex => {
2693            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2694        }
2695    };
2696
2697    let enriched_body = value
2698        .get("enriched_body")
2699        .and_then(|v| v.as_str())
2700        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2701
2702    let chars_after = enriched_body.chars().count();
2703
2704    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2705    // a trigram-Jaccard similarity between the original body and the
2706    // LLM-rewritten body. When the score falls below
2707    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2708    // rewrite as a likely hallucination. The result is recorded in the
2709    // NDJSON stream so operators can audit what the LLM tried to do.
2710    let threshold = preserve_threshold;
2711    let verdict =
2712        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2713    if !verdict.is_accepted() {
2714        return Ok(EnrichItemResult::PreservationFailed {
2715            score: match verdict {
2716                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2717                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2718                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2719            },
2720            threshold,
2721            chars_before,
2722            chars_after,
2723        });
2724    }
2725
2726    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2727    // compare the hash of the original body against the hash of the enriched
2728    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2729    // body (rare but possible) — treat as `Skipped` so re-running the batch
2730    // is safe and the queue does not get re-persisted entries.
2731    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2732    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2733    if old_hash == new_hash {
2734        return Ok(EnrichItemResult::Skipped {
2735            reason: format!(
2736                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2737            ),
2738        });
2739    }
2740
2741    // Only persist if the enriched body is genuinely longer
2742    if chars_after <= chars_before {
2743        return Ok(EnrichItemResult::Skipped {
2744            reason: format!(
2745                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2746            ),
2747        });
2748    }
2749
2750    persist_enriched_body(
2751        conn,
2752        namespace,
2753        memory_id,
2754        memory_name,
2755        enriched_body,
2756        paths,
2757        llm_backend,
2758    )?;
2759
2760    Ok(EnrichItemResult::Done {
2761        memory_id: Some(memory_id),
2762        entity_id: None,
2763        entities: 0,
2764        rels: 0,
2765        chars_before: Some(chars_before),
2766        chars_after: Some(chars_after),
2767        cost,
2768        is_oauth,
2769    })
2770}
2771
2772fn call_reembed(
2773    conn: &Connection,
2774    namespace: &str,
2775    memory_name: &str,
2776    paths: &crate::paths::AppPaths,
2777    llm_backend: crate::cli::LlmBackendChoice,
2778) -> Result<EnrichItemResult, AppError> {
2779    let (memory_id, body, memory_type): (i64, String, String) = conn
2780        .query_row(
2781            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2782             FROM memories
2783             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2784            rusqlite::params![namespace, memory_name],
2785            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2786        )
2787        .map_err(|e| match e {
2788            rusqlite::Error::QueryReturnedNoRows => {
2789                AppError::NotFound(format!("memory '{memory_name}' not found"))
2790            }
2791            other => AppError::Database(other),
2792        })?;
2793
2794    if body.trim().is_empty() {
2795        return Ok(EnrichItemResult::Skipped {
2796            reason: "body is empty".to_string(),
2797        });
2798    }
2799
2800    reembed_memory_vector(
2801        conn,
2802        namespace,
2803        memory_id,
2804        memory_name,
2805        &memory_type,
2806        &body,
2807        paths,
2808        llm_backend,
2809    )?;
2810
2811    Ok(EnrichItemResult::Done {
2812        memory_id: Some(memory_id),
2813        entity_id: None,
2814        entities: 0,
2815        rels: 0,
2816        chars_before: Some(body.chars().count()),
2817        chars_after: Some(body.chars().count()),
2818        cost: 0.0,
2819        is_oauth: true,
2820    })
2821}
2822
2823// ---------------------------------------------------------------------------
2824// Scan dispatcher — maps operation to scan query result (item keys)
2825// ---------------------------------------------------------------------------
2826
2827fn scan_operation(
2828    conn: &Connection,
2829    namespace: &str,
2830    args: &EnrichArgs,
2831) -> Result<Vec<String>, AppError> {
2832    // G37: resolve --names + --names-file once and apply to every scan path.
2833    let name_filter = resolve_name_filter(args)?;
2834    match args.operation {
2835        EnrichOperation::MemoryBindings => {
2836            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2837            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2838        }
2839        EnrichOperation::EntityDescriptions => {
2840            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2841            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2842        }
2843        EnrichOperation::BodyEnrich => {
2844            let rows =
2845                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2846            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2847        }
2848        EnrichOperation::ReEmbed => {
2849            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2850            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2851        }
2852        EnrichOperation::WeightCalibrate => {
2853            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2854            Ok(rows
2855                .into_iter()
2856                .map(|(id, _, _, _, _)| id.to_string())
2857                .collect())
2858        }
2859        EnrichOperation::RelationReclassify => {
2860            let rows = scan_generic_relations(conn, namespace, args.limit)?;
2861            Ok(rows
2862                .into_iter()
2863                .map(|(id, _, _, _)| id.to_string())
2864                .collect())
2865        }
2866        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2867            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2868            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2869        }
2870        EnrichOperation::EntityTypeValidate => {
2871            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2872            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2873        }
2874        EnrichOperation::DescriptionEnrich => {
2875            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2876            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2877        }
2878        EnrichOperation::DomainClassify
2879        | EnrichOperation::GraphAudit
2880        | EnrichOperation::DeepResearchSynth
2881        | EnrichOperation::BodyExtract => {
2882            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2883            let sql = format!(
2884                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2885            );
2886            let mut stmt = conn.prepare(&sql)?;
2887            let names = stmt
2888                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2889                .collect::<Result<Vec<_>, _>>()?;
2890            Ok(names)
2891        }
2892    }
2893}
2894
2895// ---------------------------------------------------------------------------
2896// Codex stub provider
2897// ---------------------------------------------------------------------------
2898
2899/// Locates the Codex CLI binary.
2900fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2901    if let Some(p) = explicit {
2902        if p.exists() {
2903            return Ok(p.to_path_buf());
2904        }
2905        return Err(AppError::Validation(format!(
2906            "Codex binary not found at explicit path: {}",
2907            p.display()
2908        )));
2909    }
2910
2911    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2912        let p = PathBuf::from(&env_path);
2913        if p.exists() {
2914            return Ok(p);
2915        }
2916    }
2917
2918    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2919    if let Some(path_var) = std::env::var_os("PATH") {
2920        for dir in std::env::split_paths(&path_var) {
2921            let candidate = dir.join(name);
2922            if candidate.exists() {
2923                return Ok(crate::extract::llm_embedding::resolve_real_binary(
2924                    &candidate,
2925                ));
2926            }
2927        }
2928    }
2929
2930    Err(AppError::Validation(
2931        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2932    ))
2933}
2934
2935/// G27: Calibrate weight of a single relationship via LLM.
2936fn call_weight_calibrate(
2937    conn: &Connection,
2938    _namespace: &str,
2939    item_key: &str,
2940    binary: &Path,
2941    model: Option<&str>,
2942    timeout: u64,
2943    mode: &EnrichMode,
2944) -> Result<EnrichItemResult, AppError> {
2945    let rel_id: i64 = item_key
2946        .parse()
2947        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2948    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2949        .query_row(
2950            "SELECT e1.name, e2.name, r.relation, r.weight \
2951             FROM relationships r \
2952             JOIN entities e1 ON e1.id = r.source_id \
2953             JOIN entities e2 ON e2.id = r.target_id \
2954             WHERE r.id = ?1",
2955            rusqlite::params![rel_id],
2956            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2957        )
2958        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2959
2960    let input_text = format!(
2961        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2962    );
2963    let (value, cost, is_oauth) = match mode {
2964        EnrichMode::ClaudeCode => call_claude(
2965            binary,
2966            WEIGHT_CALIBRATE_PROMPT,
2967            WEIGHT_CALIBRATE_SCHEMA,
2968            &input_text,
2969            model,
2970            timeout,
2971        )?,
2972        EnrichMode::Codex => call_codex(
2973            binary,
2974            WEIGHT_CALIBRATE_PROMPT,
2975            WEIGHT_CALIBRATE_SCHEMA,
2976            &input_text,
2977            model,
2978            timeout,
2979        )?,
2980    };
2981
2982    let calibrated = value
2983        .get("calibrated_weight")
2984        .and_then(|v| v.as_f64())
2985        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2986
2987    conn.execute(
2988        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2989        rusqlite::params![calibrated, rel_id],
2990    )?;
2991
2992    Ok(EnrichItemResult::Done {
2993        memory_id: None,
2994        entity_id: None,
2995        entities: 0,
2996        rels: 1,
2997        chars_before: None,
2998        chars_after: None,
2999        cost,
3000        is_oauth,
3001    })
3002}
3003
3004/// G27: Reclassify a generic relationship type via LLM.
3005fn call_relation_reclassify(
3006    conn: &Connection,
3007    _namespace: &str,
3008    item_key: &str,
3009    binary: &Path,
3010    model: Option<&str>,
3011    timeout: u64,
3012    mode: &EnrichMode,
3013) -> Result<EnrichItemResult, AppError> {
3014    let rel_id: i64 = item_key
3015        .parse()
3016        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3017    let (source_name, target_name, current_relation): (String, String, String) = conn
3018        .query_row(
3019            "SELECT e1.name, e2.name, r.relation \
3020             FROM relationships r \
3021             JOIN entities e1 ON e1.id = r.source_id \
3022             JOIN entities e2 ON e2.id = r.target_id \
3023             WHERE r.id = ?1",
3024            rusqlite::params![rel_id],
3025            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3026        )
3027        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3028
3029    let input_text = format!(
3030        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3031    );
3032    let (value, cost, is_oauth) = match mode {
3033        EnrichMode::ClaudeCode => call_claude(
3034            binary,
3035            RELATION_RECLASSIFY_PROMPT,
3036            RELATION_RECLASSIFY_SCHEMA,
3037            &input_text,
3038            model,
3039            timeout,
3040        )?,
3041        EnrichMode::Codex => call_codex(
3042            binary,
3043            RELATION_RECLASSIFY_PROMPT,
3044            RELATION_RECLASSIFY_SCHEMA,
3045            &input_text,
3046            model,
3047            timeout,
3048        )?,
3049    };
3050
3051    let new_relation = value
3052        .get("relation")
3053        .and_then(|v| v.as_str())
3054        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3055    let new_strength = value
3056        .get("strength")
3057        .and_then(|v| v.as_f64())
3058        .unwrap_or(0.5);
3059
3060    conn.execute(
3061        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3062        rusqlite::params![new_relation, new_strength, rel_id],
3063    )?;
3064
3065    Ok(EnrichItemResult::Done {
3066        memory_id: None,
3067        entity_id: None,
3068        entities: 0,
3069        rels: 1,
3070        chars_before: None,
3071        chars_after: None,
3072        cost,
3073        is_oauth,
3074    })
3075}
3076
3077/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3078fn call_entity_connect(
3079    conn: &Connection,
3080    namespace: &str,
3081    item_key: &str,
3082    binary: &Path,
3083    model: Option<&str>,
3084    timeout: u64,
3085    mode: &EnrichMode,
3086) -> Result<EnrichItemResult, AppError> {
3087    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3088    let (e1_id, e1_name, e2_id, e2_name) =
3089        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3090            Some(p) => p,
3091            None => {
3092                return Ok(EnrichItemResult::Skipped {
3093                    reason: "pair no longer isolated".into(),
3094                })
3095            }
3096        };
3097    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3098    let (value, cost, is_oauth) = match mode {
3099        EnrichMode::ClaudeCode => call_claude(
3100            binary,
3101            ENTITY_CONNECT_PROMPT,
3102            ENTITY_CONNECT_SCHEMA,
3103            &input_text,
3104            model,
3105            timeout,
3106        )?,
3107        EnrichMode::Codex => call_codex(
3108            binary,
3109            ENTITY_CONNECT_PROMPT,
3110            ENTITY_CONNECT_SCHEMA,
3111            &input_text,
3112            model,
3113            timeout,
3114        )?,
3115    };
3116    let relation = value
3117        .get("relation")
3118        .and_then(|v| v.as_str())
3119        .unwrap_or("none");
3120    if relation == "none" {
3121        return Ok(EnrichItemResult::Skipped {
3122            reason: "LLM determined no relationship".into(),
3123        });
3124    }
3125    let strength = value
3126        .get("strength")
3127        .and_then(|v| v.as_f64())
3128        .unwrap_or(0.5);
3129    conn.execute(
3130        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3131        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3132    )?;
3133    Ok(EnrichItemResult::Done {
3134        memory_id: None,
3135        entity_id: None,
3136        entities: 0,
3137        rels: 1,
3138        chars_before: None,
3139        chars_after: None,
3140        cost,
3141        is_oauth,
3142    })
3143}
3144
3145/// G27 P2: Validate entity type assignment via LLM.
3146fn call_entity_type_validate(
3147    conn: &Connection,
3148    _namespace: &str,
3149    item_key: &str,
3150    binary: &Path,
3151    model: Option<&str>,
3152    timeout: u64,
3153    mode: &EnrichMode,
3154) -> Result<EnrichItemResult, AppError> {
3155    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3156        .query_row(
3157            "SELECT id, name, type FROM entities WHERE name = ?1",
3158            rusqlite::params![item_key],
3159            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3160        )
3161        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3162    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3163    let (value, cost, is_oauth) = match mode {
3164        EnrichMode::ClaudeCode => call_claude(
3165            binary,
3166            ENTITY_TYPE_VALIDATE_PROMPT,
3167            ENTITY_TYPE_VALIDATE_SCHEMA,
3168            &input_text,
3169            model,
3170            timeout,
3171        )?,
3172        EnrichMode::Codex => call_codex(
3173            binary,
3174            ENTITY_TYPE_VALIDATE_PROMPT,
3175            ENTITY_TYPE_VALIDATE_SCHEMA,
3176            &input_text,
3177            model,
3178            timeout,
3179        )?,
3180    };
3181    let validated_type = value
3182        .get("validated_type")
3183        .and_then(|v| v.as_str())
3184        .unwrap_or(&ent_type);
3185    let was_correct = value
3186        .get("was_correct")
3187        .and_then(|v| v.as_bool())
3188        .unwrap_or(true);
3189    if !was_correct {
3190        conn.execute(
3191            "UPDATE entities SET type = ?1 WHERE id = ?2",
3192            rusqlite::params![validated_type, ent_id],
3193        )?;
3194    }
3195    Ok(EnrichItemResult::Done {
3196        memory_id: None,
3197        entity_id: Some(ent_id),
3198        entities: 1,
3199        rels: 0,
3200        chars_before: None,
3201        chars_after: None,
3202        cost,
3203        is_oauth,
3204    })
3205}
3206
3207/// G27 P2: Enrich generic memory description via LLM.
3208fn call_description_enrich(
3209    conn: &Connection,
3210    _namespace: &str,
3211    item_key: &str,
3212    binary: &Path,
3213    model: Option<&str>,
3214    timeout: u64,
3215    mode: &EnrichMode,
3216) -> Result<EnrichItemResult, AppError> {
3217    let (mem_id, body, old_desc): (i64, String, String) = conn
3218        .query_row(
3219            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3220            rusqlite::params![item_key],
3221            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3222        )
3223        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3224    let snippet: String = body.chars().take(500).collect();
3225    let input_text = format!(
3226        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3227    );
3228    let (value, cost, is_oauth) = match mode {
3229        EnrichMode::ClaudeCode => call_claude(
3230            binary,
3231            DESCRIPTION_ENRICH_PROMPT,
3232            DESCRIPTION_ENRICH_SCHEMA,
3233            &input_text,
3234            model,
3235            timeout,
3236        )?,
3237        EnrichMode::Codex => call_codex(
3238            binary,
3239            DESCRIPTION_ENRICH_PROMPT,
3240            DESCRIPTION_ENRICH_SCHEMA,
3241            &input_text,
3242            model,
3243            timeout,
3244        )?,
3245    };
3246    let new_desc = value
3247        .get("description")
3248        .and_then(|v| v.as_str())
3249        .unwrap_or(&old_desc);
3250    let old_name: String = conn
3251        .query_row(
3252            "SELECT name FROM memories WHERE id = ?1",
3253            rusqlite::params![mem_id],
3254            |r| r.get(0),
3255        )?;
3256    conn.execute(
3257        "UPDATE memories SET description = ?1 WHERE id = ?2",
3258        rusqlite::params![new_desc, mem_id],
3259    )?;
3260    memories::sync_fts_after_update(
3261        conn, mem_id,
3262        &old_name, &old_desc, &body,
3263        &old_name, new_desc, &body,
3264    )?;
3265    Ok(EnrichItemResult::Done {
3266        memory_id: Some(mem_id),
3267        entity_id: None,
3268        entities: 0,
3269        rels: 0,
3270        chars_before: Some(old_desc.len()),
3271        chars_after: Some(new_desc.len()),
3272        cost,
3273        is_oauth,
3274    })
3275}
3276
3277/// G27 P2: Classify memory into domain category via LLM.
3278fn call_domain_classify(
3279    conn: &Connection,
3280    _namespace: &str,
3281    item_key: &str,
3282    binary: &Path,
3283    model: Option<&str>,
3284    timeout: u64,
3285    mode: &EnrichMode,
3286) -> Result<EnrichItemResult, AppError> {
3287    let (mem_id, body, desc): (i64, String, String) = conn
3288        .query_row(
3289            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3290            rusqlite::params![item_key],
3291            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3292        )
3293        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3294    let snippet: String = body.chars().take(500).collect();
3295    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3296    let (value, cost, is_oauth) = match mode {
3297        EnrichMode::ClaudeCode => call_claude(
3298            binary,
3299            DOMAIN_CLASSIFY_PROMPT,
3300            DOMAIN_CLASSIFY_SCHEMA,
3301            &input_text,
3302            model,
3303            timeout,
3304        )?,
3305        EnrichMode::Codex => call_codex(
3306            binary,
3307            DOMAIN_CLASSIFY_PROMPT,
3308            DOMAIN_CLASSIFY_SCHEMA,
3309            &input_text,
3310            model,
3311            timeout,
3312        )?,
3313    };
3314    let domain = value
3315        .get("domain")
3316        .and_then(|v| v.as_str())
3317        .unwrap_or("uncategorized");
3318    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3319    conn.execute(
3320        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3321        rusqlite::params![metadata, mem_id],
3322    )?;
3323    Ok(EnrichItemResult::Done {
3324        memory_id: Some(mem_id),
3325        entity_id: None,
3326        entities: 0,
3327        rels: 0,
3328        chars_before: None,
3329        chars_after: None,
3330        cost,
3331        is_oauth,
3332    })
3333}
3334
3335/// G27 P2: Audit memory graph quality via LLM.
3336fn call_graph_audit(
3337    conn: &Connection,
3338    _namespace: &str,
3339    item_key: &str,
3340    binary: &Path,
3341    model: Option<&str>,
3342    timeout: u64,
3343    mode: &EnrichMode,
3344) -> Result<EnrichItemResult, AppError> {
3345    let (mem_id, body, desc): (i64, String, String) = conn
3346        .query_row(
3347            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3348            rusqlite::params![item_key],
3349            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3350        )
3351        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3352    let snippet: String = body.chars().take(500).collect();
3353    let ent_count: i64 = conn
3354        .query_row(
3355            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3356            rusqlite::params![mem_id],
3357            |r| r.get(0),
3358        )
3359        .unwrap_or(0);
3360    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3361    let (value, cost, is_oauth) = match mode {
3362        EnrichMode::ClaudeCode => call_claude(
3363            binary,
3364            GRAPH_AUDIT_PROMPT,
3365            GRAPH_AUDIT_SCHEMA,
3366            &input_text,
3367            model,
3368            timeout,
3369        )?,
3370        EnrichMode::Codex => call_codex(
3371            binary,
3372            GRAPH_AUDIT_PROMPT,
3373            GRAPH_AUDIT_SCHEMA,
3374            &input_text,
3375            model,
3376            timeout,
3377        )?,
3378    };
3379    let issues = value
3380        .get("issues")
3381        .and_then(|v| v.as_array())
3382        .map(|a| a.len())
3383        .unwrap_or(0);
3384    Ok(EnrichItemResult::Done {
3385        memory_id: Some(mem_id),
3386        entity_id: None,
3387        entities: 0,
3388        rels: issues,
3389        chars_before: None,
3390        chars_after: None,
3391        cost,
3392        is_oauth,
3393    })
3394}
3395
3396/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3397fn call_deep_research_synth(
3398    conn: &Connection,
3399    namespace: &str,
3400    item_key: &str,
3401    binary: &Path,
3402    model: Option<&str>,
3403    timeout: u64,
3404    mode: &EnrichMode,
3405) -> Result<EnrichItemResult, AppError> {
3406    let (mem_id, body): (i64, String) = conn
3407        .query_row(
3408            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3409            rusqlite::params![item_key],
3410            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3411        )
3412        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3413    let snippet: String = body.chars().take(2000).collect();
3414    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3415    let (value, cost, is_oauth) = match mode {
3416        EnrichMode::ClaudeCode => call_claude(
3417            binary,
3418            DEEP_RESEARCH_SYNTH_PROMPT,
3419            DEEP_RESEARCH_SYNTH_SCHEMA,
3420            &input_text,
3421            model,
3422            timeout,
3423        )?,
3424        EnrichMode::Codex => call_codex(
3425            binary,
3426            DEEP_RESEARCH_SYNTH_PROMPT,
3427            DEEP_RESEARCH_SYNTH_SCHEMA,
3428            &input_text,
3429            model,
3430            timeout,
3431        )?,
3432    };
3433    let mut ent_count = 0usize;
3434    let mut rel_count = 0usize;
3435    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3436        for e in ents {
3437            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3438            let etype_str = e
3439                .get("entity_type")
3440                .and_then(|v| v.as_str())
3441                .unwrap_or("concept");
3442            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3443            if name.len() >= 2 {
3444                let ne = NewEntity {
3445                    name: name.to_string(),
3446                    entity_type: etype,
3447                    description: None,
3448                };
3449                let _ = entities::upsert_entity(conn, namespace, &ne);
3450                ent_count += 1;
3451            }
3452        }
3453    }
3454    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3455        for r in rels {
3456            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3457            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3458            if src.is_empty() || tgt.is_empty() {
3459                continue;
3460            }
3461            let rel = r
3462                .get("relation")
3463                .and_then(|v| v.as_str())
3464                .unwrap_or("related");
3465            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3466            if let (Some(sid), Some(tid)) = (
3467                entities::find_entity_id(conn, namespace, src)?,
3468                entities::find_entity_id(conn, namespace, tgt)?,
3469            ) {
3470                let _ = entities::create_or_fetch_relationship(
3471                    conn, namespace, sid, tid, rel, str_, None,
3472                );
3473                rel_count += 1;
3474            }
3475        }
3476    }
3477    Ok(EnrichItemResult::Done {
3478        memory_id: Some(mem_id),
3479        entity_id: None,
3480        entities: ent_count,
3481        rels: rel_count,
3482        chars_before: None,
3483        chars_after: None,
3484        cost,
3485        is_oauth,
3486    })
3487}
3488
3489/// G27 P2: Extract structured body from unstructured text via LLM.
3490fn call_body_extract(
3491    conn: &Connection,
3492    _namespace: &str,
3493    item_key: &str,
3494    binary: &Path,
3495    model: Option<&str>,
3496    timeout: u64,
3497    mode: &EnrichMode,
3498) -> Result<EnrichItemResult, AppError> {
3499    let (mem_id, body, old_desc): (i64, String, String) = conn
3500        .query_row(
3501            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3502            rusqlite::params![item_key],
3503            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3504        )
3505        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3506    let old_name: String = conn
3507        .query_row(
3508            "SELECT name FROM memories WHERE id = ?1",
3509            rusqlite::params![mem_id],
3510            |r| r.get(0),
3511        )?;
3512    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3513    let (value, cost, is_oauth) = match mode {
3514        EnrichMode::ClaudeCode => call_claude(
3515            binary,
3516            BODY_EXTRACT_PROMPT,
3517            BODY_EXTRACT_SCHEMA,
3518            &input_text,
3519            model,
3520            timeout,
3521        )?,
3522        EnrichMode::Codex => call_codex(
3523            binary,
3524            BODY_EXTRACT_PROMPT,
3525            BODY_EXTRACT_SCHEMA,
3526            &input_text,
3527            model,
3528            timeout,
3529        )?,
3530    };
3531    let restructured = value
3532        .get("restructured_body")
3533        .and_then(|v| v.as_str())
3534        .unwrap_or(&body);
3535    let chars_before = body.len();
3536    let chars_after = restructured.len();
3537    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3538    conn.execute(
3539        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3540        rusqlite::params![restructured, new_hash, mem_id],
3541    )?;
3542    memories::sync_fts_after_update(
3543        conn, mem_id,
3544        &old_name, &old_desc, &body,
3545        &old_name, &old_desc, restructured,
3546    )?;
3547    Ok(EnrichItemResult::Done {
3548        memory_id: Some(mem_id),
3549        entity_id: None,
3550        entities: 0,
3551        rels: 0,
3552        chars_before: Some(chars_before),
3553        chars_after: Some(chars_after),
3554        cost,
3555        is_oauth,
3556    })
3557}
3558
3559/// Scan for pairs of entities that share no direct relationship.
3560#[allow(clippy::type_complexity)]
3561fn scan_isolated_entity_pairs(
3562    conn: &Connection,
3563    namespace: &str,
3564    limit: Option<usize>,
3565) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3566    let limit_val = limit.unwrap_or(50) as i64;
3567    let mut stmt = conn.prepare_cached(
3568        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3569         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3570         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3571           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3572           (r.source_id = e2.id AND r.target_id = e1.id)) \
3573         LIMIT ?2",
3574    )?;
3575    let rows = stmt
3576        .query_map(rusqlite::params![namespace, limit_val], |r| {
3577            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3578        })?
3579        .collect::<Result<Vec<_>, _>>()?;
3580    Ok(rows)
3581}
3582
3583/// Scan for entities with non-validated types (all entities for type audit).
3584fn scan_entities_for_type_validation(
3585    conn: &Connection,
3586    namespace: &str,
3587    limit: Option<usize>,
3588) -> Result<Vec<(i64, String, String)>, AppError> {
3589    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3590    let sql = format!(
3591        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3592    );
3593    let mut stmt = conn.prepare(&sql)?;
3594    let rows = stmt
3595        .query_map(rusqlite::params![namespace], |r| {
3596            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3597        })?
3598        .collect::<Result<Vec<_>, _>>()?;
3599    Ok(rows)
3600}
3601
3602/// Scan for memories with generic descriptions (ingested, imported, etc).
3603fn scan_generic_descriptions(
3604    conn: &Connection,
3605    namespace: &str,
3606    limit: Option<usize>,
3607) -> Result<Vec<(i64, String, String)>, AppError> {
3608    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3609    let sql = format!(
3610        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3611         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3612         ORDER BY id {limit_clause}"
3613    );
3614    let mut stmt = conn.prepare(&sql)?;
3615    let rows = stmt
3616        .query_map(rusqlite::params![namespace], |r| {
3617            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3618        })?
3619        .collect::<Result<Vec<_>, _>>()?;
3620    Ok(rows)
3621}
3622
3623/// Calls the Codex CLI for a single enrichment item.
3624///
3625/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3626fn call_codex(
3627    binary: &Path,
3628    prompt: &str,
3629    json_schema: &str,
3630    input_text: &str,
3631    model: Option<&str>,
3632    timeout_secs: u64,
3633) -> Result<(serde_json::Value, f64, bool), AppError> {
3634    use wait_timeout::ChildExt;
3635
3636    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3637    // schema to a trusted cache path (not /tmp), and reuse the
3638    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3639    // hardening rationale.
3640    super::codex_spawn::validate_codex_model(model)?;
3641    let schema_file = super::codex_spawn::trusted_schema_path()?;
3642
3643    let args = super::codex_spawn::CodexSpawnArgs {
3644        binary,
3645        prompt,
3646        json_schema,
3647        input_text,
3648        model,
3649        timeout_secs,
3650        schema_path: schema_file.clone(),
3651    };
3652    let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3653
3654    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3655        AppError::Io(std::io::Error::new(
3656            e.kind(),
3657            format!("failed to spawn codex: {e}"),
3658        ))
3659    })?;
3660
3661    let full_prompt = format!("{prompt}\n\n{input_text}");
3662    let stdin_bytes = full_prompt.into_bytes();
3663    let mut child_stdin = child
3664        .stdin
3665        .take()
3666        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3667    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3668        child_stdin.write_all(&stdin_bytes)?;
3669        drop(child_stdin);
3670        Ok(())
3671    });
3672
3673    let start = std::time::Instant::now();
3674    let timeout = std::time::Duration::from_secs(timeout_secs);
3675    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3676    let _ = std::fs::remove_file(&schema_file);
3677
3678    match status {
3679        Some(exit_status) => {
3680            stdin_thread
3681                .join()
3682                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3683                .map_err(AppError::Io)?;
3684
3685            tracing::debug!(
3686                target: "process",
3687                exit_code = ?exit_status.code(),
3688                elapsed_ms = start.elapsed().as_millis() as u64,
3689                "external process completed"
3690            );
3691
3692            let mut stdout_buf = Vec::new();
3693            if let Some(mut out) = child.stdout.take() {
3694                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3695            }
3696            if !exit_status.success() {
3697                let mut stderr_buf = Vec::new();
3698                if let Some(mut err) = child.stderr.take() {
3699                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3700                }
3701                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3702                tracing::warn!(
3703                    target: "enrich",
3704                    exit_code = ?exit_status.code(),
3705                    stderr = %stderr_str.trim(),
3706                    "codex process failed"
3707                );
3708                return Err(AppError::Validation(format!(
3709                    "codex exited with code {:?}: {}",
3710                    exit_status.code(),
3711                    stderr_str.trim()
3712                )));
3713            }
3714            let stdout_str = String::from_utf8(stdout_buf)
3715                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3716            // G32: use the JSONL parser, NOT serde_json::from_str on the
3717            // entire stdout (codex emits one event per line).
3718            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3719            // Return the raw agent_message text parsed as JSON. Different
3720            // operations (memory-bindings, body-enrich) use different
3721            // output schemas, so we let the caller pick which fields to
3722            // extract. The previous implementation hardcoded
3723            // `{entities, urls}` which broke body-enrich.
3724            let value: serde_json::Value =
3725                serde_json::from_str(&result.last_agent_text).map_err(|e| {
3726                    AppError::Validation(format!(
3727                        "codex agent_message is not valid JSON: {e}; raw={}",
3728                        result.last_agent_text
3729                    ))
3730                })?;
3731            Ok((value, 0.0, false))
3732        }
3733        None => {
3734            let _ = child.kill();
3735            let _ = child.wait();
3736            let _ = stdin_thread.join();
3737            Err(AppError::Validation(format!(
3738                "codex timed out after {timeout_secs} seconds"
3739            )))
3740        }
3741    }
3742}
3743
3744// ---------------------------------------------------------------------------
3745// Tests
3746// ---------------------------------------------------------------------------
3747
3748#[cfg(test)]
3749mod tests {
3750    use super::*;
3751    use rusqlite::Connection;
3752    #[cfg(unix)]
3753    use std::os::unix::fs::PermissionsExt;
3754
3755    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
3756    fn open_test_db() -> Connection {
3757        let conn = Connection::open_in_memory().expect("in-memory db");
3758        conn.execute_batch(
3759            "CREATE TABLE memories (
3760                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3761                namespace   TEXT NOT NULL DEFAULT 'global',
3762                name        TEXT NOT NULL,
3763                type        TEXT NOT NULL DEFAULT 'note',
3764                description TEXT NOT NULL DEFAULT '',
3765                body        TEXT NOT NULL DEFAULT '',
3766                body_hash   TEXT NOT NULL DEFAULT '',
3767                session_id  TEXT,
3768                source      TEXT NOT NULL DEFAULT 'agent',
3769                metadata    TEXT NOT NULL DEFAULT '{}',
3770                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3771                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3772                deleted_at  INTEGER,
3773                UNIQUE(namespace, name)
3774            );
3775            CREATE TABLE entities (
3776                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3777                namespace   TEXT NOT NULL DEFAULT 'global',
3778                name        TEXT NOT NULL,
3779                type        TEXT NOT NULL DEFAULT 'concept',
3780                description TEXT,
3781                degree      INTEGER NOT NULL DEFAULT 0,
3782                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3783                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3784                UNIQUE(namespace, name)
3785            );
3786            CREATE TABLE memory_entities (
3787                memory_id  INTEGER NOT NULL,
3788                entity_id  INTEGER NOT NULL,
3789                PRIMARY KEY (memory_id, entity_id)
3790            );
3791            CREATE TABLE relationships (
3792                id         INTEGER PRIMARY KEY AUTOINCREMENT,
3793                namespace  TEXT NOT NULL DEFAULT 'global',
3794                source_id  INTEGER NOT NULL,
3795                target_id  INTEGER NOT NULL,
3796                relation   TEXT NOT NULL,
3797                weight     REAL NOT NULL DEFAULT 0.5,
3798                description TEXT,
3799                UNIQUE(source_id, target_id, relation)
3800            );
3801            CREATE TABLE memory_embeddings (
3802                memory_id   INTEGER PRIMARY KEY,
3803                namespace   TEXT NOT NULL,
3804                embedding   BLOB NOT NULL,
3805                source      TEXT NOT NULL,
3806                model       TEXT NOT NULL DEFAULT '',
3807                dim         INTEGER NOT NULL DEFAULT 384,
3808                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
3809            );",
3810        )
3811        .expect("schema creation must succeed");
3812        conn
3813    }
3814
3815    #[test]
3816    fn scan_unbound_memories_finds_memories_without_bindings() {
3817        let conn = open_test_db();
3818        conn.execute(
3819            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3820            [],
3821        )
3822        .unwrap();
3823
3824        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3825        assert_eq!(results.len(), 1);
3826        assert_eq!(results[0].1, "test-mem");
3827    }
3828
3829    #[test]
3830    fn scan_unbound_memories_excludes_bound_memories() {
3831        let conn = open_test_db();
3832        conn.execute(
3833            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3834            [],
3835        )
3836        .unwrap();
3837        let mem_id: i64 = conn
3838            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3839                r.get(0)
3840            })
3841            .unwrap();
3842        conn.execute(
3843            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3844            [],
3845        )
3846        .unwrap();
3847        let ent_id: i64 = conn
3848            .query_row(
3849                "SELECT id FROM entities WHERE name='some-entity'",
3850                [],
3851                |r| r.get(0),
3852            )
3853            .unwrap();
3854        conn.execute(
3855            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3856            rusqlite::params![mem_id, ent_id],
3857        )
3858        .unwrap();
3859
3860        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3861        assert!(results.is_empty(), "bound memory must not appear in scan");
3862    }
3863
3864    #[test]
3865    fn scan_entities_without_description_finds_null_description() {
3866        let conn = open_test_db();
3867        conn.execute(
3868            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3869            [],
3870        )
3871        .unwrap();
3872
3873        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3874        assert_eq!(results.len(), 1);
3875        assert_eq!(results[0].1, "my-tool");
3876    }
3877
3878    #[test]
3879    fn scan_entities_without_description_excludes_entities_with_description() {
3880        let conn = open_test_db();
3881        conn.execute(
3882            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3883            [],
3884        )
3885        .unwrap();
3886
3887        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3888        assert!(
3889            results.is_empty(),
3890            "entity with description must not appear"
3891        );
3892    }
3893
3894    #[test]
3895    fn scan_short_body_memories_finds_short_bodies() {
3896        let conn = open_test_db();
3897        conn.execute(
3898            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3899            [],
3900        )
3901        .unwrap();
3902
3903        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3904        assert_eq!(results.len(), 1);
3905        assert_eq!(results[0].1, "short-mem");
3906    }
3907
3908    #[test]
3909    fn scan_short_body_memories_excludes_long_bodies() {
3910        let conn = open_test_db();
3911        let long_body = "a".repeat(1000);
3912        conn.execute(
3913            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3914            rusqlite::params![long_body],
3915        )
3916        .unwrap();
3917
3918        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3919        assert!(results.is_empty(), "long memory must not appear in scan");
3920    }
3921
3922    #[test]
3923    fn scan_respects_limit() {
3924        let conn = open_test_db();
3925        for i in 0..5 {
3926            conn.execute(
3927                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3928                [],
3929            )
3930            .unwrap();
3931        }
3932
3933        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3934        assert_eq!(results.len(), 3, "limit must be respected");
3935    }
3936
3937    #[test]
3938    fn scan_memories_without_embeddings_finds_only_missing_rows() {
3939        let conn = open_test_db();
3940        conn.execute(
3941            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3942            [],
3943        )
3944        .unwrap();
3945        conn.execute(
3946            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3947            [],
3948        )
3949        .unwrap();
3950        let memory_id: i64 = conn
3951            .query_row(
3952                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3953                [],
3954                |r| r.get(0),
3955            )
3956            .unwrap();
3957        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3958        memories::upsert_vec(
3959            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3960        )
3961        .unwrap();
3962
3963        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3964        assert_eq!(results.len(), 1);
3965        assert_eq!(results[0].1, "missing-vec");
3966    }
3967
3968    #[test]
3969    fn scan_memories_without_embeddings_respects_name_filter() {
3970        let conn = open_test_db();
3971        conn.execute(
3972            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3973            [],
3974        )
3975        .unwrap();
3976        conn.execute(
3977            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3978            [],
3979        )
3980        .unwrap();
3981
3982        let results =
3983            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3984                .unwrap();
3985        assert_eq!(results.len(), 1);
3986        assert_eq!(results[0].1, "match-me");
3987    }
3988
3989    #[test]
3990    fn queue_db_schema_creates_correctly() {
3991        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3992        let conn = open_queue_db(&tmp_path).expect("queue db must open");
3993        let count: i64 = conn
3994            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3995            .unwrap();
3996        assert_eq!(count, 0);
3997        let _ = std::fs::remove_file(&tmp_path);
3998    }
3999
4000    #[test]
4001    fn parse_claude_output_valid_bindings() {
4002        let output = r#"[
4003            {"type":"system","subtype":"init"},
4004            {"type":"result","is_error":false,"total_cost_usd":0.01,
4005             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4006        ]"#;
4007        let result = crate::commands::claude_runner::parse_claude_output(output)
4008            .expect("must parse successfully");
4009        assert!(result.value.get("entities").is_some());
4010        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4011        assert!(!result.is_oauth);
4012    }
4013
4014    #[test]
4015    fn parse_claude_output_detects_oauth() {
4016        let output = r#"[
4017            {"type":"system","subtype":"init","apiKeySource":"none"},
4018            {"type":"result","is_error":false,"total_cost_usd":0.0,
4019             "structured_output":{"entities":[],"relationships":[]}}
4020        ]"#;
4021        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4022        assert!(result.is_oauth);
4023    }
4024
4025    #[test]
4026    fn parse_claude_output_rate_limit_returns_error() {
4027        let output = r#"[
4028            {"type":"system","subtype":"init"},
4029            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4030        ]"#;
4031        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4032        assert!(matches!(err, AppError::RateLimited { .. }));
4033    }
4034
4035    #[test]
4036    fn parse_claude_output_auth_error() {
4037        let output = r#"[
4038            {"type":"system","subtype":"init"},
4039            {"type":"result","is_error":true,"error":"authentication failed"}
4040        ]"#;
4041        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4042        assert!(format!("{err}").contains("authentication failed"));
4043    }
4044
4045    #[cfg(unix)]
4046    #[test]
4047    fn call_codex_returns_raw_json_for_body_enrich_schema() {
4048        let tmp = tempfile::tempdir().expect("tempdir");
4049        let binary = tmp.path().join("codex-mock");
4050        std::fs::write(
4051            &binary,
4052            r#"#!/usr/bin/env bash
4053set -euo pipefail
4054cat <<'JSONL'
4055{"type":"thread.started","thread_id":"mock-thread-0"}
4056{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4057{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4058JSONL
4059"#,
4060        )
4061        .expect("mock codex write");
4062        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4063        perms.set_mode(0o755);
4064        std::fs::set_permissions(&binary, perms).expect("chmod");
4065
4066        let (value, cost, is_oauth) =
4067            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4068                .expect("call_codex must accept body-enrich payload");
4069
4070        assert_eq!(value["enriched_body"], "expanded body");
4071        assert_eq!(cost, 0.0);
4072        assert!(!is_oauth);
4073    }
4074
4075    #[test]
4076    fn dry_run_emits_preview_without_calling_llm() {
4077        // This test validates the dry-run NDJSON contract without spawning any process.
4078        // The scan_operation function requires a DB; we build one in-memory but cannot
4079        // call run() directly because it needs AppPaths (disk). Instead we test the
4080        // lower-level helpers that the dry-run path relies on.
4081        let conn = open_test_db();
4082        conn.execute(
4083            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4084            [],
4085        )
4086        .unwrap();
4087
4088        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4089        assert_eq!(results.len(), 1);
4090        assert_eq!(results[0].1, "dry-mem");
4091        // If scan finds the item and dry_run is set, no LLM would be called.
4092        // The NDJSON emission is tested via integration tests with a fake binary.
4093    }
4094
4095    #[test]
4096    fn persist_entity_description_updates_db() {
4097        let conn = open_test_db();
4098        conn.execute(
4099            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4100            [],
4101        )
4102        .unwrap();
4103        let eid: i64 = conn
4104            .query_row(
4105                "SELECT id FROM entities WHERE name='tokio-runtime'",
4106                [],
4107                |r| r.get(0),
4108            )
4109            .unwrap();
4110
4111        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4112
4113        let desc: String = conn
4114            .query_row(
4115                "SELECT description FROM entities WHERE id=?1",
4116                rusqlite::params![eid],
4117                |r| r.get(0),
4118            )
4119            .unwrap();
4120        assert_eq!(desc, "Async runtime for Rust applications");
4121    }
4122
4123    #[test]
4124    fn bindings_schema_is_valid_json() {
4125        let _: serde_json::Value =
4126            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4127    }
4128
4129    #[test]
4130    fn entity_description_schema_is_valid_json() {
4131        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4132            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4133    }
4134
4135    #[test]
4136    fn body_enrich_schema_is_valid_json() {
4137        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4138            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4139    }
4140}