Skip to main content

sqlite_graphrag/commands/
enrich.rs

1// TODO v1.0.89: este arquivo tem 4116 linhas — modularização planejada.
2// Ver ADR-0046 seção "Known Tech Debt (v1.0.89+)".
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY opportunity
19//!
20//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
21//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
22//! here verbatim. A future refactoring could extract them into a shared
23//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
24//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
25//! this stream's boundary — flagged here for the Integration stream to evaluate.
26
27use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42// ---------------------------------------------------------------------------
43// Constants
44// ---------------------------------------------------------------------------
45
46const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51// ---------------------------------------------------------------------------
52// JSON schema used for memory-bindings and body-enrich extraction
53// ---------------------------------------------------------------------------
54
55const BINDINGS_SCHEMA: &str = r#"{
56  "type": "object",
57  "properties": {
58    "entities": {
59      "type": "array",
60      "items": {
61        "type": "object",
62        "properties": {
63          "name": { "type": "string" },
64          "entity_type": {
65            "type": "string",
66            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67          }
68        },
69        "required": ["name", "entity_type"],
70        "additionalProperties": false
71      }
72    },
73    "relationships": {
74      "type": "array",
75      "items": {
76        "type": "object",
77        "properties": {
78          "source": { "type": "string" },
79          "target": { "type": "string" },
80          "relation": {
81            "type": "string",
82            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83          },
84          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85        },
86        "required": ["source","target","relation","strength"],
87        "additionalProperties": false
88      }
89    }
90  },
91  "required": ["entities","relationships"],
92  "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96  "type": "object",
97  "properties": {
98    "description": { "type": "string" }
99  },
100  "required": ["description"],
101  "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105  "type": "object",
106  "properties": {
107    "enriched_body": { "type": "string" }
108  },
109  "required": ["enriched_body"],
110  "additionalProperties": false
111}"#;
112
113// G27 P1: weight-calibrate
114const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123  "type": "object",
124  "properties": {
125    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126    "reasoning": { "type": "string" }
127  },
128  "required": ["calibrated_weight", "reasoning"],
129  "additionalProperties": false
130}"#;
131
132// G27 P1: relation-reclassify
133const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149  "type": "object",
150  "properties": {
151    "relation": { "type": "string" },
152    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153    "reasoning": { "type": "string" }
154  },
155  "required": ["relation", "strength", "reasoning"],
156  "additionalProperties": false
157}"#;
158
159// G27 P2: entity-connect — suggest relationships between isolated entities
160const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166  "type": "object",
167  "properties": {
168    "relation": { "type": "string" },
169    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170    "reasoning": { "type": "string" }
171  },
172  "required": ["relation", "strength", "reasoning"],
173  "additionalProperties": false
174}"#;
175
176// G27 P2: entity-type-validate — verify entity type assignments
177const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183  "type": "object",
184  "properties": {
185    "validated_type": { "type": "string" },
186    "was_correct": { "type": "boolean" },
187    "reasoning": { "type": "string" }
188  },
189  "required": ["validated_type", "was_correct", "reasoning"],
190  "additionalProperties": false
191}"#;
192
193// G27 P2: description-enrich — improve generic memory descriptions
194const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200  "type": "object",
201  "properties": {
202    "description": { "type": "string" },
203    "reasoning": { "type": "string" }
204  },
205  "required": ["description", "reasoning"],
206  "additionalProperties": false
207}"#;
208
209// G27 P2: domain-classify — classify memory into domain category
210const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214  "type": "object",
215  "properties": {
216    "domain": { "type": "string" },
217    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218    "reasoning": { "type": "string" }
219  },
220  "required": ["domain", "confidence", "reasoning"],
221  "additionalProperties": false
222}"#;
223
224// G27 P2: graph-audit — audit graph for quality issues
225const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230  "type": "object",
231  "properties": {
232    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234    "reasoning": { "type": "string" }
235  },
236  "required": ["quality_score", "issues", "reasoning"],
237  "additionalProperties": false
238}"#;
239
240// G27 P2: deep-research-synth — synthesize research findings into graph
241const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247  "type": "object",
248  "properties": {
249    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251    "summary": { "type": "string" }
252  },
253  "required": ["entities", "relationships", "summary"],
254  "additionalProperties": false
255}"#;
256
257// G27 P2: body-extract — extract structured content from unstructured text
258const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263  "type": "object",
264  "properties": {
265    "restructured_body": { "type": "string" },
266    "changes_summary": { "type": "string" }
267  },
268  "required": ["restructured_body", "changes_summary"],
269  "additionalProperties": false
270}"#;
271
272// ---------------------------------------------------------------------------
273// Prompts
274// ---------------------------------------------------------------------------
275
276const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291// ---------------------------------------------------------------------------
292// CLI args
293// ---------------------------------------------------------------------------
294
295/// Operation to perform in the `enrich` command.
296#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299    /// Add missing entity/relationship bindings to memories (fully implemented).
300    MemoryBindings,
301    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
302    EntityDescriptions,
303    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
304    BodyEnrich,
305    /// Rebuild missing memory embeddings without rewriting the memory body.
306    ReEmbed,
307    /// Calibrate relationship weights using LLM analysis (scan only).
308    WeightCalibrate,
309    /// Reclassify relationship types using LLM judgment (scan only).
310    RelationReclassify,
311    /// Connect isolated entities by suggesting new relationships (scan only).
312    EntityConnect,
313    /// Validate entity type assignments using LLM judgment (scan only).
314    EntityTypeValidate,
315    /// Enrich memory descriptions that are generic/auto-generated (scan only).
316    DescriptionEnrich,
317    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
318    CrossDomainBridges,
319    /// Classify memories into domain categories (scan only).
320    DomainClassify,
321    /// Audit the graph for quality issues (scan only).
322    GraphAudit,
323    /// Synthesize deep-research findings into graph memories (scan only).
324    DeepResearchSynth,
325    /// Extract structured body from unstructured text (scan only).
326    BodyExtract,
327}
328
329/// LLM provider for enrichment.
330#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332    /// Use locally installed Claude Code CLI (OAuth-first).
333    ClaudeCode,
334    /// Use locally installed OpenAI Codex CLI.
335    Codex,
336    /// Use locally installed OpenCode CLI.
337    #[value(name = "opencode")]
338    Opencode,
339    /// Use the OpenRouter chat-completions REST API (no local CLI; v1.0.95).
340    #[value(name = "openrouter")]
341    OpenRouter,
342}
343
344impl std::fmt::Display for EnrichMode {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        match self {
347            EnrichMode::ClaudeCode => write!(f, "claude-code"),
348            EnrichMode::Codex => write!(f, "codex"),
349            EnrichMode::Opencode => write!(f, "opencode"),
350            EnrichMode::OpenRouter => write!(f, "openrouter"),
351        }
352    }
353}
354
355/// Arguments for the `enrich` subcommand.
356#[derive(clap::Args)]
357#[command(
358    about = "Enrich graph memories and entities using an LLM provider",
359    after_long_help = "EXAMPLES:\n  \
360    # Add missing entity bindings to all unbound memories\n  \
361    sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n  \
362    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
363    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
364    # Expand short memory bodies (GAP-18)\n  \
365    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
366    # Rebuild only missing memory embeddings without rewriting bodies\n  \
367    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
368    # Resume an interrupted body-enrich run\n  \
369    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
370    # Retry only failed items from a previous run\n  \
371    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
372    EXIT CODES:\n  \
373    0  success\n  \
374    1  validation error (bad args, binary not found)\n  \
375    14 I/O error"
376)]
377pub struct EnrichArgs {
378    /// Enrichment operation to run.
379    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
380    pub operation: EnrichOperation,
381
382    /// LLM provider to use. Required (no default).
383    #[arg(long, value_enum)]
384    pub mode: EnrichMode,
385
386    /// Maximum number of items to process in this run. Omit for all.
387    #[arg(long, value_name = "N")]
388    pub limit: Option<usize>,
389
390    /// Preview items without calling the LLM (zero tokens consumed).
391    #[arg(long)]
392    pub dry_run: bool,
393
394    /// Namespace to operate on. Default: global.
395    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
396    pub namespace: Option<String>,
397
398    // -- Provider flags (Claude) --
399    /// Path to the Claude Code binary. Default: auto-detect from PATH.
400    #[arg(long, value_name = "PATH")]
401    pub claude_binary: Option<PathBuf>,
402
403    /// Claude model to use (e.g. claude-sonnet-4-6).
404    #[arg(long, value_name = "MODEL")]
405    pub claude_model: Option<String>,
406
407    /// Timeout per item in seconds when using Claude Code. Default: 300.
408    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
409    pub claude_timeout: u64,
410
411    // -- Provider flags (Codex) --
412    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
413    #[arg(long, value_name = "PATH")]
414    pub codex_binary: Option<PathBuf>,
415
416    /// Codex model to use (e.g. o4-mini).
417    #[arg(long, value_name = "MODEL")]
418    pub codex_model: Option<String>,
419
420    /// Timeout per item in seconds when using Codex. Default: 300.
421    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
422    pub codex_timeout: u64,
423
424    // -- Provider flags (OpenCode) --
425    /// Path to the OpenCode binary. Default: auto-detect from PATH.
426    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
427    pub opencode_binary: Option<PathBuf>,
428
429    /// OpenCode model to use.
430    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
431    pub opencode_model: Option<String>,
432
433    /// Timeout per item in seconds when using OpenCode. Default: 300.
434    #[arg(
435        long,
436        value_name = "SECONDS",
437        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
438        default_value_t = 300
439    )]
440    pub opencode_timeout: u64,
441
442    // -- Provider flags (OpenRouter, v1.0.95) --
443    /// OpenRouter text model to use (REQUIRED with --mode openrouter; no default).
444    #[arg(long, value_name = "MODEL")]
445    pub openrouter_model: Option<String>,
446
447    /// OpenRouter API key. Falls back to OPENROUTER_API_KEY env or stored config.
448    #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
449    pub openrouter_api_key: Option<String>,
450
451    /// Timeout per item in seconds when using OpenRouter. Default: 300.
452    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
453    pub openrouter_timeout: u64,
454
455    /// Optional OpenRouter base URL override (reserved; defaults to the public API).
456    #[arg(long, value_name = "URL")]
457    pub openrouter_base_url: Option<String>,
458
459    // -- Cost controls --
460    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
461    #[arg(long, value_name = "USD")]
462    pub max_cost_usd: Option<f64>,
463
464    // -- Queue controls --
465    /// Resume a previously interrupted run (skip already-done items).
466    #[arg(long)]
467    pub resume: bool,
468
469    /// Retry only items that failed in a previous run.
470    #[arg(long)]
471    pub retry_failed: bool,
472
473    // -- body-enrich specific flags (GAP-18) --
474    /// Minimum output character count for body-enrich. Default: 500.
475    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
476    pub min_output_chars: usize,
477
478    /// Maximum output character count for body-enrich. Default: 2000.
479    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
480    pub max_output_chars: usize,
481
482    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
483    #[arg(long, default_value_t = true)]
484    pub preserve_check: bool,
485
486    /// Path to a custom prompt template file for body-enrich.
487    #[arg(long, value_name = "PATH")]
488    pub prompt_template: Option<PathBuf>,
489
490    /// Number of parallel LLM workers (default 1 = serial).
491    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
492    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
493    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
494    pub llm_parallelism: u32,
495
496    // -- Output / infra --
497    /// Emit NDJSON output. Always true; flag accepted for compatibility.
498    #[arg(long)]
499    pub json: bool,
500
501    /// Database path override.
502    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
503    pub db: Option<String>,
504
505    /// G30: poll for the job singleton every second for up to N seconds
506    /// when another invocation holds the lock. Default: 0 (fail fast).
507    #[arg(long, value_name = "SECONDS")]
508    pub wait_job_singleton: Option<u64>,
509
510    /// G30: force acquisition of the singleton lock by removing a stale
511    /// lock file from a previously crashed invocation. Use only when you
512    /// are certain no other `enrich`/`ingest` is running.
513    #[arg(long, default_value_t = false)]
514    pub force_job_singleton: bool,
515
516    /// G37: select a specific subset of memory names to enrich instead of
517    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
518    /// Empty when omitted (processes all candidates).
519    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
520    pub names: Vec<String>,
521
522    /// G37: read the subset of memory names from a file (one per line).
523    /// Lines starting with `#` and empty lines are ignored. Combined with
524    /// `--names` (union) when both are set.
525    #[arg(long, value_name = "PATH")]
526    pub names_file: Option<PathBuf>,
527
528    /// G35: probe the LLM provider with a 1-turn ping before processing
529    /// the batch. Aborts with a clear error if the rate-limit window is
530    /// closed (avoids burning N turns only to fail on item 1).
531    #[arg(long, default_value_t = false)]
532    pub preflight_check: bool,
533
534    /// G35: if a preflight probe or in-flight call hits the Claude rate
535    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
536    /// of failing the batch. Ignored when `--mode` is already `codex`.
537    #[arg(long, value_enum)]
538    pub fallback_mode: Option<EnrichMode>,
539
540    /// G35: number of seconds before the OAuth rate-limit reset at which
541    /// the preflight probe should refuse to start. Default 300 (5 min).
542    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
543    pub rate_limit_buffer: u64,
544
545    /// G28-D: refuse to start when the 1-minute load average exceeds
546    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
547    /// Set to false to skip the check on contended CI runners.
548    #[arg(long, default_value_t = true)]
549    pub max_load_check: bool,
550
551    /// G28-D: when the system is saturated, abort the job after this
552    /// many consecutive HardFailure outcomes. Default 5.
553    #[arg(long, value_name = "N", default_value_t = 5)]
554    pub circuit_breaker_threshold: u32,
555
556    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
557    /// original body and the LLM-rewritten body for the rewrite to be
558    /// accepted. Scores below the threshold are rejected and emitted as
559    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
560    /// gap specification). Ignored when `--operation` is not
561    /// `body-enrich`.
562    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
563    pub preserve_threshold: f64,
564
565    /// G33 Passo 3: when set, validate `--codex-model` against the
566    /// ChatGPT Pro OAuth accepted-model list and abort with a
567    /// suggestion when the value is unknown. Default true (fail fast
568    /// to avoid burning OAuth turns). Set to false to opt out.
569    #[arg(long, default_value_t = true)]
570    pub codex_model_validate: bool,
571
572    /// G33 Passo 3: when set together with an invalid `--codex-model`,
573    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
574    /// instead of aborting. The substitution is recorded in the NDJSON
575    /// stream as `provider_substituted: true` for traceability.
576    #[arg(long, value_name = "MODEL")]
577    pub codex_model_fallback: Option<String>,
578}
579
580// ---------------------------------------------------------------------------
581// Internal types — raw LLM output structs
582// ---------------------------------------------------------------------------
583
584// ---------------------------------------------------------------------------
585// NDJSON event types emitted to stdout
586// ---------------------------------------------------------------------------
587
588#[derive(Debug, Serialize)]
589struct PhaseEvent<'a> {
590    phase: &'a str,
591    #[serde(skip_serializing_if = "Option::is_none")]
592    binary_path: Option<&'a str>,
593    #[serde(skip_serializing_if = "Option::is_none")]
594    version: Option<&'a str>,
595    #[serde(skip_serializing_if = "Option::is_none")]
596    items_total: Option<usize>,
597    #[serde(skip_serializing_if = "Option::is_none")]
598    items_pending: Option<usize>,
599    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
600    #[serde(skip_serializing_if = "Option::is_none")]
601    llm_parallelism: Option<u32>,
602}
603
604#[derive(Debug, Serialize)]
605struct ItemEvent<'a> {
606    /// Item identifier (memory name or entity name).
607    item: &'a str,
608    status: &'a str,
609    #[serde(skip_serializing_if = "Option::is_none")]
610    memory_id: Option<i64>,
611    #[serde(skip_serializing_if = "Option::is_none")]
612    entity_id: Option<i64>,
613    #[serde(skip_serializing_if = "Option::is_none")]
614    entities: Option<usize>,
615    #[serde(skip_serializing_if = "Option::is_none")]
616    rels: Option<usize>,
617    #[serde(skip_serializing_if = "Option::is_none")]
618    chars_before: Option<usize>,
619    #[serde(skip_serializing_if = "Option::is_none")]
620    chars_after: Option<usize>,
621    #[serde(skip_serializing_if = "Option::is_none")]
622    cost_usd: Option<f64>,
623    #[serde(skip_serializing_if = "Option::is_none")]
624    elapsed_ms: Option<u64>,
625    #[serde(skip_serializing_if = "Option::is_none")]
626    error: Option<String>,
627    index: usize,
628    total: usize,
629}
630
631#[derive(Debug, Serialize)]
632struct EnrichSummary {
633    summary: bool,
634    operation: String,
635    items_total: usize,
636    completed: usize,
637    failed: usize,
638    skipped: usize,
639    cost_usd: f64,
640    elapsed_ms: u64,
641    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
642    /// executou o re-embedding durante o enrich. `"claude" | "codex" | "none"`.
643    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
644    /// ou quando a operação não envolveu re-embed).
645    #[serde(skip_serializing_if = "Option::is_none")]
646    backend_invoked: Option<&'static str>,
647}
648
649use crate::output::emit_json_line as emit_json;
650
651// ---------------------------------------------------------------------------
652// Queue DB
653// ---------------------------------------------------------------------------
654
655/// Opens or creates the enrichment queue database.
656///
657/// The queue schema mirrors `ingest_claude` for resume/retry parity.
658/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
659///
660/// # DRY note
661///
662/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
663/// Both should be unified in a shared `llm_runner.rs` module by the
664/// Integration stream.
665fn open_queue_db(path: &str) -> Result<Connection, AppError> {
666    let conn = Connection::open(path)?;
667    conn.pragma_update(None, "journal_mode", "wal")?;
668    conn.execute_batch(
669        "CREATE TABLE IF NOT EXISTS queue (
670            id          INTEGER PRIMARY KEY AUTOINCREMENT,
671            item_key    TEXT NOT NULL UNIQUE,
672            item_type   TEXT NOT NULL DEFAULT 'memory',
673            status      TEXT NOT NULL DEFAULT 'pending',
674            memory_id   INTEGER,
675            entity_id   INTEGER,
676            entities    INTEGER DEFAULT 0,
677            rels        INTEGER DEFAULT 0,
678            error       TEXT,
679            cost_usd    REAL DEFAULT 0.0,
680            attempt     INTEGER DEFAULT 0,
681            elapsed_ms  INTEGER,
682            created_at  TEXT DEFAULT (datetime('now')),
683            done_at     TEXT
684        );
685        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
686    )?;
687    Ok(conn)
688}
689
690// ---------------------------------------------------------------------------
691// LLM invocation — Claude Code
692// ---------------------------------------------------------------------------
693
694/// Calls `claude -p` via the shared `claude_runner` module (G02).
695///
696/// Returns `(output_value, cost_usd, is_oauth)`.
697fn call_claude(
698    binary: &Path,
699    prompt: &str,
700    json_schema: &str,
701    input_text: &str,
702    model: Option<&str>,
703    timeout_secs: u64,
704) -> Result<(serde_json::Value, f64, bool), AppError> {
705    let result = crate::commands::claude_runner::run_claude(
706        binary,
707        prompt,
708        json_schema,
709        input_text,
710        model,
711        timeout_secs,
712        7,
713    )?;
714    Ok((result.value, result.cost_usd, result.is_oauth))
715}
716
717/// v1.0.95 (ADR-0054): route a single JUDGE turn through the OpenRouter
718/// chat-completions REST API. Unlike the subprocess runners there is no
719/// `binary` argument: the process-wide chat client (initialised in `run()`
720/// before scan) is fetched from the singleton and driven synchronously via
721/// the shared tokio runtime. Returns `(value, cost_usd, is_oauth=false)`
722/// where `cost_usd` is read from the response `usage.cost`.
723fn call_openrouter(
724    prompt: &str,
725    json_schema: &str,
726    input_text: &str,
727    model: Option<&str>,
728    timeout_secs: u64,
729) -> Result<(serde_json::Value, f64, bool), AppError> {
730    // `model` is bound into the client singleton at init; `timeout_secs` is
731    // enforced by the reqwest builder. Both remain in the signature for
732    // parity with the subprocess runners.
733    let _ = (model, timeout_secs);
734    let client = crate::embedder::openrouter_chat_client().ok_or_else(|| {
735        AppError::Validation(
736            "OpenRouter chat client not initialised before dispatch (internal error)".into(),
737        )
738    })?;
739    let runtime = crate::embedder::shared_runtime()?;
740    runtime.block_on(client.complete(prompt, input_text, json_schema, None))
741}
742
743// ---------------------------------------------------------------------------
744// Preflight probe (G35) — single-turn ping to verify the LLM provider
745// ---------------------------------------------------------------------------
746
747/// Result of a single preflight ping (G35).
748enum PreflightOutcome {
749    /// The provider accepted the ping without rate-limit or other errors.
750    Healthy,
751    /// The provider rejected the ping due to OAuth rate limit. The
752    /// `suggestion` field is a human hint that callers can embed in the
753    /// user-facing error.
754    RateLimited {
755        reason: String,
756        suggestion: &'static str,
757    },
758    /// Any other provider error (binary missing, auth failure, etc.).
759    Error(AppError),
760}
761
762/// Probes the configured LLM provider with a 1-turn ping.
763///
764/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
765/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
766///
767/// The probe intentionally avoids spawning any MCP server children (G28-A)
768/// to keep its own process footprint at the minimum.
769fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
770    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
771
772    match args.mode {
773        EnrichMode::ClaudeCode => {
774            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
775                Ok(b) => b,
776                Err(e) => return PreflightOutcome::Error(e),
777            };
778            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
779            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
780            // form) and run the preflight gate before spawn, mirroring
781            // the canonical pattern in `claude_runner::build_claude_command`.
782            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
783                Ok(p) => p,
784                Err(e) => {
785                    return PreflightOutcome::Error(AppError::Io(e));
786                }
787            };
788            let mut cmd = std::process::Command::new(&bin);
789            crate::spawn::env_whitelist::apply_env_whitelist(
790                &mut cmd,
791                crate::spawn::env_whitelist::is_strict_env_clear(),
792            );
793            if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
794                return PreflightOutcome::Error(e);
795            }
796            cmd.arg("-p")
797                .arg("ping")
798                .arg("--max-turns")
799                .arg("1")
800                .arg("--strict-mcp-config")
801                .arg("--mcp-config")
802                .arg(mcp_config_path.as_os_str())
803                .arg("--dangerously-skip-permissions")
804                .arg("--settings")
805                .arg("{\"hooks\":{}}")
806                .arg("--output-format")
807                .arg("json")
808                .stdin(std::process::Stdio::null())
809                .stdout(std::process::Stdio::piped())
810                .stderr(std::process::Stdio::piped());
811
812            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
813                Ok(c) => c,
814                Err(e) => {
815                    return PreflightOutcome::Error(AppError::Io(e));
816                }
817            };
818            let output = match wait_with_timeout(child, timeout) {
819                Ok(out) => out,
820                Err(e) => return PreflightOutcome::Error(e),
821            };
822            if !output.status.success() {
823                let stderr = String::from_utf8_lossy(&output.stderr);
824                if stderr.contains("hit your session limit")
825                    || stderr.contains("rate_limit")
826                    || stderr.contains("429")
827                {
828                    return PreflightOutcome::RateLimited {
829                        reason: stderr.trim().to_string(),
830                        suggestion:
831                            "wait for the OAuth window to reset or use --fallback-mode codex",
832                    };
833                }
834                return PreflightOutcome::Error(AppError::Validation(format!(
835                    "preflight probe failed: {stderr}",
836                    stderr = stderr.trim()
837                )));
838            }
839            PreflightOutcome::Healthy
840        }
841        EnrichMode::Codex => {
842            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
843                Ok(b) => b,
844                Err(e) => return PreflightOutcome::Error(e),
845            };
846            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
847                .map_err(PreflightOutcome::Error)
848                .ok();
849            let schema = "{}";
850            let schema_path = match super::codex_spawn::trusted_schema_path() {
851                Ok(p) => p,
852                Err(e) => return PreflightOutcome::Error(e),
853            };
854            let spawn_args = super::codex_spawn::CodexSpawnArgs {
855                binary: &bin,
856                prompt: "ping",
857                json_schema: schema,
858                input_text: "",
859                model: args.codex_model.as_deref(),
860                timeout_secs: args.rate_limit_buffer.max(60),
861                schema_path: schema_path.clone(),
862            };
863            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
864                Ok(c) => c,
865                Err(e) => return PreflightOutcome::Error(e),
866            };
867            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
868                Ok(c) => c,
869                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
870            };
871            let output = match wait_with_timeout(child, timeout) {
872                Ok(out) => out,
873                Err(e) => return PreflightOutcome::Error(e),
874            };
875            let _ = std::fs::remove_file(&schema_path);
876            if !output.status.success() {
877                let stderr = String::from_utf8_lossy(&output.stderr);
878                if stderr.contains("rate_limit")
879                    || stderr.contains("429")
880                    || stderr.contains("Too Many Requests")
881                {
882                    return PreflightOutcome::RateLimited {
883                        reason: stderr.trim().to_string(),
884                        suggestion: "wait for the rate-limit window to reset",
885                    };
886                }
887                return PreflightOutcome::Error(AppError::Validation(format!(
888                    "preflight probe failed: {stderr}",
889                    stderr = stderr.trim()
890                )));
891            }
892            PreflightOutcome::Healthy
893        }
894        EnrichMode::Opencode => {
895            let bin = match super::opencode_runner::find_opencode_binary_with_override(
896                args.opencode_binary.as_deref(),
897            ) {
898                Ok(b) => b,
899                Err(e) => return PreflightOutcome::Error(e),
900            };
901            let model =
902                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
903            let mut cmd =
904                match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
905                {
906                    Ok(c) => c,
907                    Err(e) => return PreflightOutcome::Error(e),
908                };
909            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
910                Ok(c) => c,
911                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
912            };
913            let output = match wait_with_timeout(child, timeout) {
914                Ok(out) => out,
915                Err(e) => return PreflightOutcome::Error(e),
916            };
917            if !output.status.success() {
918                let stderr = String::from_utf8_lossy(&output.stderr);
919                if stderr.contains("rate_limit")
920                    || stderr.contains("429")
921                    || stderr.contains("Too Many Requests")
922                {
923                    return PreflightOutcome::RateLimited {
924                        reason: stderr.trim().to_string(),
925                        suggestion: "wait for the rate-limit window to reset",
926                    };
927                }
928                return PreflightOutcome::Error(AppError::Validation(format!(
929                    "preflight probe failed: {stderr}",
930                    stderr = stderr.trim()
931                )));
932            }
933            PreflightOutcome::Healthy
934        }
935        EnrichMode::OpenRouter => {
936            // v1.0.95: the OpenRouter JUDGE has no subprocess to ping; the
937            // preflight only confirms a usable API key resolves. The chat
938            // client singleton is initialised in run() before scan.
939            match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
940                Some(_) => PreflightOutcome::Healthy,
941                None => PreflightOutcome::Error(AppError::Validation(
942                    "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
943                )),
944            }
945        }
946    }
947}
948
949/// Cross-platform wait with timeout (no extra crate dependency).
950fn wait_with_timeout(
951    mut child: std::process::Child,
952    timeout: std::time::Duration,
953) -> Result<std::process::Output, AppError> {
954    use wait_timeout::ChildExt;
955    let start = std::time::Instant::now();
956    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
957    if status.is_none() {
958        let _ = child.kill();
959        let _ = child.wait();
960        return Err(AppError::Validation(format!(
961            "preflight probe timed out after {}s",
962            start.elapsed().as_secs()
963        )));
964    }
965    let mut stdout = Vec::new();
966    if let Some(mut out) = child.stdout.take() {
967        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
968    }
969    let mut stderr = Vec::new();
970    if let Some(mut err) = child.stderr.take() {
971        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
972    }
973    let exit = status.unwrap();
974    Ok(std::process::Output {
975        status: exit,
976        stdout,
977        stderr,
978    })
979}
980
981// ---------------------------------------------------------------------------
982// SCAN helpers — SQL queries that find items needing enrichment
983// ---------------------------------------------------------------------------
984
985/// Returns memories without any `memory_entities` binding.
986///
987/// These are the targets for `memory-bindings` enrichment. When `name_filter`
988/// is non-empty, restricts the scan to the given names (G37); unknown names
989/// are silently skipped (the caller can detect them by comparing
990/// requested vs. returned).
991fn scan_unbound_memories(
992    conn: &Connection,
993    namespace: &str,
994    limit: Option<usize>,
995    name_filter: &[String],
996) -> Result<Vec<(i64, String, String)>, AppError> {
997    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
998
999    if name_filter.is_empty() {
1000        let sql = format!(
1001            "SELECT m.id, m.name, m.body
1002             FROM memories m
1003             WHERE m.namespace = ?1
1004               AND m.deleted_at IS NULL
1005               AND NOT EXISTS (
1006                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1007               )
1008             ORDER BY m.id
1009             {limit_clause}"
1010        );
1011        let mut stmt = conn.prepare(&sql)?;
1012        let rows = stmt
1013            .query_map(rusqlite::params![namespace], |r| {
1014                Ok((
1015                    r.get::<_, i64>(0)?,
1016                    r.get::<_, String>(1)?,
1017                    r.get::<_, String>(2)?,
1018                ))
1019            })?
1020            .collect::<Result<Vec<_>, _>>()?;
1021        Ok(rows)
1022    } else {
1023        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
1024        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1025            .map(|i| format!("?{i}"))
1026            .collect();
1027        let in_clause = placeholders.join(", ");
1028        let sql = format!(
1029            "SELECT m.id, m.name, m.body
1030             FROM memories m
1031             WHERE m.namespace = ?1
1032               AND m.deleted_at IS NULL
1033               AND m.name IN ({in_clause})
1034               AND NOT EXISTS (
1035                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1036               )
1037             ORDER BY m.id
1038             {limit_clause}"
1039        );
1040        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1041        params_vec.push(&namespace);
1042        for n in name_filter {
1043            params_vec.push(n);
1044        }
1045        let mut stmt = conn.prepare(&sql)?;
1046        let rows = stmt
1047            .query_map(
1048                rusqlite::params_from_iter(params_vec.iter().copied()),
1049                |r| {
1050                    Ok((
1051                        r.get::<_, i64>(0)?,
1052                        r.get::<_, String>(1)?,
1053                        r.get::<_, String>(2)?,
1054                    ))
1055                },
1056            )?
1057            .collect::<Result<Vec<_>, _>>()?;
1058        Ok(rows)
1059    }
1060}
1061
1062/// Reads a list of memory names from a UTF-8 text file (G37).
1063///
1064/// Empty lines and lines beginning with `#` are skipped. Returns a
1065/// de-duplicated, order-preserving list of trimmed names.
1066fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1067    let content = std::fs::read_to_string(path).map_err(|e| {
1068        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1069    })?;
1070    let mut seen = std::collections::HashSet::new();
1071    let mut out = Vec::new();
1072    for line in content.lines() {
1073        let trimmed = line.trim();
1074        if trimmed.is_empty() || trimmed.starts_with('#') {
1075            continue;
1076        }
1077        if seen.insert(trimmed.to_string()) {
1078            out.push(trimmed.to_string());
1079        }
1080    }
1081    Ok(out)
1082}
1083
1084/// Resolves the union of `--names` and `--names-file` (G37).
1085fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1086    let mut combined: Vec<String> = args.names.clone();
1087    if let Some(p) = &args.names_file {
1088        let from_file = read_names_file(p)?;
1089        for n in from_file {
1090            if !combined.contains(&n) {
1091                combined.push(n);
1092            }
1093        }
1094    }
1095    Ok(combined)
1096}
1097
1098/// Returns entities with NULL or empty description.
1099///
1100/// These are the targets for `entity-descriptions` enrichment.
1101fn scan_entities_without_description(
1102    conn: &Connection,
1103    namespace: &str,
1104    limit: Option<usize>,
1105    name_filter: &[String],
1106) -> Result<Vec<(i64, String, String)>, AppError> {
1107    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1108
1109    if name_filter.is_empty() {
1110        let sql = format!(
1111            "SELECT id, name, type
1112             FROM entities
1113             WHERE namespace = ?1
1114               AND (description IS NULL OR description = '')
1115             ORDER BY id
1116             {limit_clause}"
1117        );
1118        let mut stmt = conn.prepare(&sql)?;
1119        let rows = stmt
1120            .query_map(rusqlite::params![namespace], |r| {
1121                Ok((
1122                    r.get::<_, i64>(0)?,
1123                    r.get::<_, String>(1)?,
1124                    r.get::<_, String>(2)?,
1125                ))
1126            })?
1127            .collect::<Result<Vec<_>, _>>()?;
1128        Ok(rows)
1129    } else {
1130        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1131            .map(|i| format!("?{i}"))
1132            .collect();
1133        let in_clause = placeholders.join(", ");
1134        let sql = format!(
1135            "SELECT id, name, type
1136             FROM entities
1137             WHERE namespace = ?1
1138               AND name IN ({in_clause})
1139               AND (description IS NULL OR description = '')
1140             ORDER BY id
1141             {limit_clause}"
1142        );
1143        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1144        params_vec.push(&namespace);
1145        for n in name_filter {
1146            params_vec.push(n);
1147        }
1148        let mut stmt = conn.prepare(&sql)?;
1149        let rows = stmt
1150            .query_map(
1151                rusqlite::params_from_iter(params_vec.iter().copied()),
1152                |r| {
1153                    Ok((
1154                        r.get::<_, i64>(0)?,
1155                        r.get::<_, String>(1)?,
1156                        r.get::<_, String>(2)?,
1157                    ))
1158                },
1159            )?
1160            .collect::<Result<Vec<_>, _>>()?;
1161        Ok(rows)
1162    }
1163}
1164
1165/// Returns memories whose body length is below the configured minimum.
1166///
1167/// These are the targets for `body-enrich` (GAP-18).
1168fn scan_short_body_memories(
1169    conn: &Connection,
1170    namespace: &str,
1171    min_chars: usize,
1172    limit: Option<usize>,
1173    name_filter: &[String],
1174) -> Result<Vec<(i64, String, String)>, AppError> {
1175    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1176
1177    if name_filter.is_empty() {
1178        let sql = format!(
1179            "SELECT m.id, m.name, m.body
1180             FROM memories m
1181             WHERE m.namespace = ?1
1182               AND m.deleted_at IS NULL
1183               AND LENGTH(COALESCE(m.body,'')) < ?2
1184             ORDER BY m.id
1185             {limit_clause}"
1186        );
1187        let mut stmt = conn.prepare(&sql)?;
1188        let rows = stmt
1189            .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1190                Ok((
1191                    r.get::<_, i64>(0)?,
1192                    r.get::<_, String>(1)?,
1193                    r.get::<_, String>(2)?,
1194                ))
1195            })?
1196            .collect::<Result<Vec<_>, _>>()?;
1197        Ok(rows)
1198    } else {
1199        let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1200            .map(|i| format!("?{i}"))
1201            .collect();
1202        let in_clause = placeholders.join(", ");
1203        let sql = format!(
1204            "SELECT m.id, m.name, m.body
1205             FROM memories m
1206             WHERE m.namespace = ?1
1207               AND m.deleted_at IS NULL
1208               AND m.name IN ({in_clause})
1209               AND LENGTH(COALESCE(m.body,'')) < ?2
1210             ORDER BY m.id
1211             {limit_clause}"
1212        );
1213        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1214        let min_chars_i64 = min_chars as i64;
1215        params_vec.push(&namespace);
1216        params_vec.push(&min_chars_i64);
1217        for n in name_filter {
1218            params_vec.push(n);
1219        }
1220        let mut stmt = conn.prepare(&sql)?;
1221        let rows = stmt
1222            .query_map(
1223                rusqlite::params_from_iter(params_vec.iter().copied()),
1224                |r| {
1225                    Ok((
1226                        r.get::<_, i64>(0)?,
1227                        r.get::<_, String>(1)?,
1228                        r.get::<_, String>(2)?,
1229                    ))
1230                },
1231            )?
1232            .collect::<Result<Vec<_>, _>>()?;
1233        Ok(rows)
1234    }
1235}
1236
1237/// Returns live memories that still have no row in `memory_embeddings`.
1238///
1239/// These are the targets for `re-embed`.
1240fn scan_memories_without_embeddings(
1241    conn: &Connection,
1242    namespace: &str,
1243    limit: Option<usize>,
1244    name_filter: &[String],
1245) -> Result<Vec<(i64, String, String)>, AppError> {
1246    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1247
1248    if name_filter.is_empty() {
1249        let sql = format!(
1250            "SELECT m.id, m.name, COALESCE(m.body,'')
1251             FROM memories m
1252             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1253             WHERE m.namespace = ?1
1254               AND m.deleted_at IS NULL
1255               AND me.memory_id IS NULL
1256             ORDER BY m.id
1257             {limit_clause}"
1258        );
1259        let mut stmt = conn.prepare(&sql)?;
1260        let rows = stmt
1261            .query_map(rusqlite::params![namespace], |r| {
1262                Ok((
1263                    r.get::<_, i64>(0)?,
1264                    r.get::<_, String>(1)?,
1265                    r.get::<_, String>(2)?,
1266                ))
1267            })?
1268            .collect::<Result<Vec<_>, _>>()?;
1269        Ok(rows)
1270    } else {
1271        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1272            .map(|i| format!("?{i}"))
1273            .collect();
1274        let in_clause = placeholders.join(", ");
1275        let sql = format!(
1276            "SELECT m.id, m.name, COALESCE(m.body,'')
1277             FROM memories m
1278             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1279             WHERE m.namespace = ?1
1280               AND m.deleted_at IS NULL
1281               AND m.name IN ({in_clause})
1282               AND me.memory_id IS NULL
1283             ORDER BY m.id
1284             {limit_clause}"
1285        );
1286        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1287        params_vec.push(&namespace);
1288        for n in name_filter {
1289            params_vec.push(n);
1290        }
1291        let mut stmt = conn.prepare(&sql)?;
1292        let rows = stmt
1293            .query_map(
1294                rusqlite::params_from_iter(params_vec.iter().copied()),
1295                |r| {
1296                    Ok((
1297                        r.get::<_, i64>(0)?,
1298                        r.get::<_, String>(1)?,
1299                        r.get::<_, String>(2)?,
1300                    ))
1301                },
1302            )?
1303            .collect::<Result<Vec<_>, _>>()?;
1304        Ok(rows)
1305    }
1306}
1307
1308/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1309#[allow(clippy::type_complexity)]
1310fn scan_weight_candidates(
1311    conn: &Connection,
1312    namespace: &str,
1313    limit: Option<usize>,
1314) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1315    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1316    let sql = format!(
1317        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1318         FROM relationships r \
1319         JOIN entities e1 ON e1.id = r.source_id \
1320         JOIN entities e2 ON e2.id = r.target_id \
1321         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1322         ORDER BY r.weight DESC {limit_clause}"
1323    );
1324    let mut stmt = conn.prepare(&sql)?;
1325    let rows = stmt
1326        .query_map(rusqlite::params![namespace], |r| {
1327            Ok((
1328                r.get::<_, i64>(0)?,
1329                r.get::<_, String>(1)?,
1330                r.get::<_, String>(2)?,
1331                r.get::<_, String>(3)?,
1332                r.get::<_, f64>(4)?,
1333            ))
1334        })?
1335        .collect::<Result<Vec<_>, _>>()?;
1336    Ok(rows)
1337}
1338
1339/// G27: Returns relationships with generic relation types (applies_to).
1340fn scan_generic_relations(
1341    conn: &Connection,
1342    namespace: &str,
1343    limit: Option<usize>,
1344) -> Result<Vec<(i64, String, String, String)>, AppError> {
1345    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1346    let sql = format!(
1347        "SELECT r.id, e1.name, e2.name, r.relation \
1348         FROM relationships r \
1349         JOIN entities e1 ON e1.id = r.source_id \
1350         JOIN entities e2 ON e2.id = r.target_id \
1351         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1352         ORDER BY r.id {limit_clause}"
1353    );
1354    let mut stmt = conn.prepare(&sql)?;
1355    let rows = stmt
1356        .query_map(rusqlite::params![namespace], |r| {
1357            Ok((
1358                r.get::<_, i64>(0)?,
1359                r.get::<_, String>(1)?,
1360                r.get::<_, String>(2)?,
1361                r.get::<_, String>(3)?,
1362            ))
1363        })?
1364        .collect::<Result<Vec<_>, _>>()?;
1365    Ok(rows)
1366}
1367
1368// ---------------------------------------------------------------------------
1369// PERSIST helpers for fully-implemented operations
1370// ---------------------------------------------------------------------------
1371
1372/// Persists entity bindings extracted by the LLM for a memory.
1373///
1374/// Creates entities via `upsert_entity`, links them to the memory via
1375/// `link_memory_entity`, and upserts relationships found between entities.
1376fn persist_memory_bindings(
1377    conn: &Connection,
1378    namespace: &str,
1379    memory_id: i64,
1380    entities_json: &serde_json::Value,
1381    rels_json: &serde_json::Value,
1382) -> Result<(usize, usize), AppError> {
1383    #[derive(Deserialize)]
1384    struct EntityItem {
1385        name: String,
1386        entity_type: String,
1387    }
1388    #[derive(Deserialize)]
1389    struct RelItem {
1390        source: String,
1391        target: String,
1392        relation: String,
1393        strength: f64,
1394    }
1395
1396    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1397        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1398
1399    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1400        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1401
1402    let mut ent_count = 0usize;
1403    let mut rel_count = 0usize;
1404
1405    for item in &extracted_entities {
1406        let entity_type = match item.entity_type.parse::<EntityType>() {
1407            Ok(et) => et,
1408            Err(_) => {
1409                tracing::warn!(
1410                    target: "enrich",
1411                    entity = %item.name,
1412                    entity_type = %item.entity_type,
1413                    "entity type not recognized, skipping"
1414                );
1415                continue;
1416            }
1417        };
1418        match entities::upsert_entity(
1419            conn,
1420            namespace,
1421            &NewEntity {
1422                name: item.name.clone(),
1423                entity_type,
1424                description: None,
1425            },
1426        ) {
1427            Ok(eid) => {
1428                let _ = entities::link_memory_entity(conn, memory_id, eid);
1429                ent_count += 1;
1430            }
1431            Err(e) => {
1432                tracing::warn!(
1433                    target: "enrich",
1434                    entity = %item.name,
1435                    error = %e,
1436                    "entity upsert skipped"
1437                );
1438            }
1439        }
1440    }
1441
1442    for rel in &extracted_rels {
1443        let normalized = crate::parsers::normalize_relation(&rel.relation);
1444        crate::parsers::warn_if_non_canonical(&normalized);
1445
1446        // Normalize entity names before lookup: upsert_entity normalizes on write,
1447        // so the lookup must use the same normalized form to find the row.
1448        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1449        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1450        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1451        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1452        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1453            let new_rel = NewRelationship {
1454                source: rel.source.clone(),
1455                target: rel.target.clone(),
1456                relation: normalized,
1457                strength: rel.strength,
1458                description: None,
1459            };
1460            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1461                rel_count += 1;
1462            }
1463        }
1464    }
1465
1466    Ok((ent_count, rel_count))
1467}
1468
1469/// Updates an entity's description directly in the `entities` table.
1470fn persist_entity_description(
1471    conn: &Connection,
1472    entity_id: i64,
1473    description: &str,
1474) -> Result<(), AppError> {
1475    conn.execute(
1476        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1477        rusqlite::params![description, entity_id],
1478    )?;
1479    Ok(())
1480}
1481
1482/// v1.0.84 (ADR-0042): on successful re-embed, records the active backend
1483/// into the shared accumulator (`ENRICH_LAST_BACKEND`) so the final
1484/// `EnrichSummary` can expose `backend_invoked` without changing every
1485/// caller's signature. Best-effort observability — concurrent enrich runs
1486/// may race, but `Mutex` keeps the mutation safe.
1487#[allow(clippy::too_many_arguments)]
1488fn reembed_memory_vector(
1489    conn: &Connection,
1490    namespace: &str,
1491    memory_id: i64,
1492    memory_name: &str,
1493    memory_type: &str,
1494    body: &str,
1495    paths: &crate::paths::AppPaths,
1496    llm_backend: crate::cli::LlmBackendChoice,
1497    embedding_backend: crate::cli::EmbeddingBackendChoice,
1498) -> Result<(), AppError> {
1499    let snippet: String = body.chars().take(200).collect();
1500    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
1501    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
1502    // backend que efetivamente rodou e popula o accumulator para o
1503    // EnrichSummary agregado.
1504    // v1.0.93 (GAP-OR-PROPAGATION): honour --embedding-backend openrouter.
1505    let (embedding, backend_kind) = crate::embedder::embed_passage_with_embedding_choice(
1506        &paths.models,
1507        body,
1508        embedding_backend,
1509        llm_backend,
1510    )?;
1511    record_enrich_backend(backend_kind.as_str());
1512    memories::upsert_vec(
1513        conn,
1514        memory_id,
1515        namespace,
1516        memory_type,
1517        &embedding,
1518        memory_name,
1519        &snippet,
1520    )?;
1521    Ok(())
1522}
1523
1524/// v1.0.84 (ADR-0042): process-local accumulator of the last LLM backend
1525/// that successfully ran a re-embed during the current enrich invocation.
1526/// Read by `run` once at summary emission. Scoped to a single process —
1527/// cross-process enrichment is gated by the per-namespace singleton, so
1528/// there is no concurrency hazard across DBs.
1529fn record_enrich_backend(backend: &'static str) {
1530    if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1531        *guard = Some(backend);
1532    }
1533}
1534
1535fn take_enrich_backend() -> Option<&'static str> {
1536    ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1537}
1538
1539static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1540
1541/// Persists an enriched memory body (body-enrich, GAP-18).
1542///
1543/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1544/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1545#[allow(clippy::too_many_arguments)]
1546fn persist_enriched_body(
1547    conn: &Connection,
1548    namespace: &str,
1549    memory_id: i64,
1550    memory_name: &str,
1551    new_body: &str,
1552    paths: &crate::paths::AppPaths,
1553    llm_backend: crate::cli::LlmBackendChoice,
1554    embedding_backend: crate::cli::EmbeddingBackendChoice,
1555) -> Result<(), AppError> {
1556    // Read current values for FTS sync
1557    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1558        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1559        rusqlite::params![memory_id],
1560        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1561    )?;
1562
1563    let memory_type: String = conn.query_row(
1564        "SELECT type FROM memories WHERE id=?1",
1565        rusqlite::params![memory_id],
1566        |r| r.get(0),
1567    )?;
1568
1569    let description: String = conn.query_row(
1570        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1571        rusqlite::params![memory_id],
1572        |r| r.get(0),
1573    )?;
1574
1575    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1576
1577    let new_memory = memories::NewMemory {
1578        namespace: namespace.to_string(),
1579        name: memory_name.to_string(),
1580        memory_type: memory_type.clone(),
1581        description: description.clone(),
1582        body: new_body.to_string(),
1583        body_hash,
1584        session_id: None,
1585        source: "agent".to_string(),
1586        metadata: serde_json::json!({
1587            "operation": "body-enrich",
1588            "orig_chars": old_body.chars().count(),
1589            "new_chars": new_body.chars().count(),
1590        }),
1591    };
1592
1593    // G29 audit: insert a new immutable version BEFORE the update so the
1594    // enriched body is reachable through `history --name <X>` and
1595    // `restore --version N` can roll back to the pre-enrich state.
1596    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1597    let version_metadata = serde_json::json!({
1598        "operation": "body-enrich",
1599        "orig_chars": old_body.chars().count(),
1600        "new_chars": new_body.chars().count(),
1601    })
1602    .to_string();
1603    crate::storage::versions::insert_version(
1604        conn,
1605        memory_id,
1606        next_version,
1607        memory_name,
1608        &memory_type,
1609        &description,
1610        new_body,
1611        &version_metadata,
1612        Some("enrich"),
1613        "edit",
1614    )?;
1615
1616    memories::update(conn, memory_id, &new_memory, None)?;
1617    memories::sync_fts_after_update(
1618        conn,
1619        memory_id,
1620        &old_name,
1621        &old_desc,
1622        &old_body,
1623        &new_memory.name,
1624        &new_memory.description,
1625        &new_memory.body,
1626    )?;
1627
1628    // Re-embed for recall accuracy
1629    if let Err(e) = reembed_memory_vector(
1630        conn,
1631        namespace,
1632        memory_id,
1633        memory_name,
1634        &memory_type,
1635        new_body,
1636        paths,
1637        llm_backend,
1638        embedding_backend,
1639    ) {
1640        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1641    }
1642
1643    Ok(())
1644}
1645
1646// ---------------------------------------------------------------------------
1647// Main entry point
1648// ---------------------------------------------------------------------------
1649
1650// ---------------------------------------------------------------------------
1651// G20: mode-conditional flag validation
1652// ---------------------------------------------------------------------------
1653
1654/// True when a scalar value matches its declared default. Used to
1655/// distinguish "operator passed an explicit override" from "clap filled
1656/// the default" for flags with default_value_t.
1657fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1658    value == default
1659}
1660
1661/// G20: validate that flags for one LLM provider were not passed when
1662/// the operator selected a different provider. Flags silently discarded
1663/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1664/// DB work, so the operator gets an actionable error instead of a
1665/// surprise at runtime.
1666///
1667/// Detection rules:
1668/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1669/// - For scalar fields with default_value_t: value != default means explicit
1670/// - For boolean fields: true means explicit (default is false)
1671///
1672/// Mode-specific matrices:
1673/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1674/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1675fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1676    const DEFAULT_TIMEOUT: u64 = 300;
1677
1678    let mut conflicts: Vec<String> = Vec::new();
1679
1680    match args.mode {
1681        EnrichMode::ClaudeCode => {
1682            if args.codex_binary.is_some() {
1683                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1684            }
1685            if args.codex_model.is_some() {
1686                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1687            }
1688            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1689                conflicts.push(format!(
1690                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1691                    args.codex_timeout
1692                ));
1693            }
1694        }
1695        EnrichMode::Codex => {
1696            if args.claude_binary.is_some() {
1697                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1698            }
1699            if args.claude_model.is_some() {
1700                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1701            }
1702            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1703                conflicts.push(format!(
1704                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1705                    args.claude_timeout
1706                ));
1707            }
1708            if args.max_cost_usd.is_some() {
1709                conflicts.push(
1710                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1711                        .to_string(),
1712                );
1713            }
1714        }
1715        EnrichMode::Opencode => {
1716            if args.claude_binary.is_some() {
1717                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1718            }
1719            if args.claude_model.is_some() {
1720                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1721            }
1722            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1723                conflicts.push(format!(
1724                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1725                    args.claude_timeout
1726                ));
1727            }
1728            if args.max_cost_usd.is_some() {
1729                conflicts.push(
1730                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1731                        .to_string(),
1732                );
1733            }
1734        }
1735        EnrichMode::OpenRouter => {
1736            if args.claude_binary.is_some() {
1737                conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1738            }
1739            if args.claude_model.is_some() {
1740                conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1741            }
1742            if args.codex_binary.is_some() {
1743                conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1744            }
1745            if args.codex_model.is_some() {
1746                conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1747            }
1748            if args.opencode_binary.is_some() {
1749                conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1750            }
1751            if args.opencode_model.is_some() {
1752                conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1753            }
1754            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1755                conflicts.push(format!(
1756                    "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1757                    args.claude_timeout
1758                ));
1759            }
1760            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1761                conflicts.push(format!(
1762                    "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1763                    args.codex_timeout
1764                ));
1765            }
1766            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1767                conflicts.push(format!(
1768                    "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1769                    args.opencode_timeout
1770                ));
1771            }
1772        }
1773    }
1774
1775    if !conflicts.is_empty() {
1776        return Err(AppError::Validation(format!(
1777            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1778            args.mode,
1779            conflicts.join("\n  - ")
1780        )));
1781    }
1782
1783    Ok(())
1784}
1785
1786// ---------------------------------------------------------------------------
1787
1788/// Main entry point for the `enrich` command.
1789pub fn run(
1790    args: &EnrichArgs,
1791    llm_backend: crate::cli::LlmBackendChoice,
1792    embedding_backend: crate::cli::EmbeddingBackendChoice,
1793) -> Result<(), AppError> {
1794    // G20: mode-conditional flag validation BEFORE any DB access.
1795    // Surfaces flags that the wrong mode would silently discard.
1796    validate_mode_conditional_flags_enrich(args)?;
1797
1798    // v1.0.95 (ADR-0054): when the JUDGE is OpenRouter the model is mandatory
1799    // (no default) and the API key must resolve BEFORE any network or DB work.
1800    // The chat client singleton is initialised here so every per-item dispatch
1801    // fetches it without re-threading the key.
1802    if args.mode == EnrichMode::OpenRouter {
1803        let model = args.openrouter_model.as_deref().ok_or_else(|| {
1804            AppError::Validation(
1805                "--mode openrouter requires --openrouter-model (no default model is allowed)"
1806                    .into(),
1807            )
1808        })?;
1809        let resolved =
1810            crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1811                .ok_or_else(|| {
1812                    AppError::Validation(
1813                        "OPENROUTER_API_KEY not found; set the env var, store it via \
1814                         `config add-key --provider openrouter`, or pass --openrouter-api-key"
1815                            .into(),
1816                    )
1817                })?;
1818        crate::embedder::get_openrouter_chat_client(
1819            resolved.value,
1820            model,
1821            args.openrouter_timeout,
1822        )?;
1823    }
1824
1825    let started = Instant::now();
1826
1827    let paths = AppPaths::resolve(args.db.as_deref())?;
1828    ensure_db_ready(&paths)?;
1829    let conn = open_rw(&paths.db)?;
1830    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1831
1832    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1833    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1834    // on the same DB cannot co-exist, but concurrent enrich on different
1835    // databases works as expected. The force flag (--force) breaks a
1836    // stale lock from a previously crashed invocation.
1837    let wait_secs = args.wait_job_singleton;
1838    let force_flag = args.force_job_singleton;
1839    let _singleton = crate::lock::acquire_job_singleton(
1840        crate::lock::JobType::Enrich,
1841        &namespace,
1842        &paths.db,
1843        wait_secs,
1844        force_flag,
1845    )?;
1846
1847    // Validate provider binary upfront only for LLM-backed operations.
1848    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1849        None
1850    } else {
1851        Some(match args.mode {
1852            EnrichMode::ClaudeCode => {
1853                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1854                let version = super::claude_runner::validate_claude_version(&bin)?;
1855                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1856                emit_json(&PhaseEvent {
1857                    phase: "validate",
1858                    binary_path: bin.to_str(),
1859                    version: Some(&version),
1860                    items_total: None,
1861                    items_pending: None,
1862                    llm_parallelism: None,
1863                });
1864                bin
1865            }
1866            EnrichMode::Codex => {
1867                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1868                emit_json(&PhaseEvent {
1869                    phase: "validate",
1870                    binary_path: bin.to_str(),
1871                    version: None,
1872                    items_total: None,
1873                    items_pending: None,
1874                    llm_parallelism: None,
1875                });
1876                bin
1877            }
1878            EnrichMode::Opencode => {
1879                let bin = super::opencode_runner::find_opencode_binary_with_override(
1880                    args.opencode_binary.as_deref(),
1881                )?;
1882                emit_json(&PhaseEvent {
1883                    phase: "validate",
1884                    binary_path: bin.to_str(),
1885                    version: None,
1886                    items_total: None,
1887                    items_pending: None,
1888                    llm_parallelism: None,
1889                });
1890                bin
1891            }
1892            EnrichMode::OpenRouter => {
1893                // v1.0.95: the OpenRouter JUDGE is a REST call, not a spawned
1894                // binary. The chat client singleton was initialised at the top
1895                // of run(); this placeholder path threads through the dispatch
1896                // but is never dereferenced by the OpenRouter arm.
1897                emit_json(&PhaseEvent {
1898                    phase: "validate",
1899                    binary_path: None,
1900                    version: None,
1901                    items_total: None,
1902                    items_pending: None,
1903                    llm_parallelism: None,
1904                });
1905                PathBuf::new()
1906            }
1907        })
1908    };
1909
1910    // G28-D: refuse to start when the system is saturated. This check
1911    // is BEFORE preflight so we never spend an OAuth turn on a host
1912    // that is already at the limit.
1913    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1914        let load = crate::system_load::load_average_one();
1915        let n = crate::system_load::ncpus();
1916        return Err(AppError::Validation(format!(
1917            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1918             pass --no-max-load-check to override (not recommended)"
1919        )));
1920    }
1921
1922    // G35: preflight probe — issue a single ping turn to verify the
1923    // provider is healthy before scanning N candidates. If the probe
1924    // fails with a rate-limit error, optionally fall back to a
1925    // different mode (typically codex) instead of failing the entire
1926    // batch. The probe itself consumes 1 OAuth turn, so it stays
1927    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1928    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1929    {
1930        let preflight_result = run_preflight_probe(args);
1931        match preflight_result {
1932            PreflightOutcome::Healthy => {
1933                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1934            }
1935            PreflightOutcome::RateLimited { reason, suggestion } => {
1936                if let Some(fallback) = args.fallback_mode.clone() {
1937                    if fallback != args.mode {
1938                        // G35 (v1.0.69): the mid-batch mode switch is
1939                        // intentionally NOT applied because it would
1940                        // desynchronise the per-item rate-limit wait
1941                        // state (rate-limited items in the worker are
1942                        // timed against the original provider). Instead
1943                        // we abort cleanly so the operator can re-invoke
1944                        // with `--mode {fallback:?}`. This guarantees no
1945                        // OAuth window is wasted and no partial state
1946                        // is left in the queue.
1947                        return Err(AppError::Validation(format!(
1948                            "preflight detected rate limit on {mode:?}: {reason}; \
1949                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1950                            mode = args.mode
1951                        )));
1952                    }
1953                    return Err(AppError::Validation(format!(
1954                        "preflight detected rate limit on {mode:?}: {reason}; \
1955                         --fallback-mode matches --mode, no recovery possible",
1956                        mode = args.mode
1957                    )));
1958                }
1959                return Err(AppError::Validation(format!(
1960                    "preflight detected rate limit on {mode:?}: {reason}; \
1961                     {suggestion}; pass --fallback-mode codex to recover",
1962                    mode = args.mode
1963                )));
1964            }
1965            PreflightOutcome::Error(e) => {
1966                return Err(e);
1967            }
1968        }
1969    }
1970
1971    // SCAN phase
1972    let scan_result = scan_operation(&conn, &namespace, args)?;
1973    let total = scan_result.len();
1974
1975    emit_json(&PhaseEvent {
1976        phase: "scan",
1977        binary_path: None,
1978        version: None,
1979        items_total: Some(total),
1980        items_pending: Some(total),
1981        llm_parallelism: Some(args.llm_parallelism),
1982    });
1983
1984    // Dry-run: emit preview events and summary without calling LLM
1985    if args.dry_run {
1986        for (idx, key) in scan_result.iter().enumerate() {
1987            emit_json(&ItemEvent {
1988                item: key,
1989                status: "preview",
1990                memory_id: None,
1991                entity_id: None,
1992                entities: None,
1993                rels: None,
1994                chars_before: None,
1995                chars_after: None,
1996                cost_usd: None,
1997                elapsed_ms: None,
1998                error: None,
1999                index: idx,
2000                total,
2001            });
2002        }
2003        emit_json(&EnrichSummary {
2004            summary: true,
2005            operation: format!("{:?}", args.operation),
2006            items_total: total,
2007            completed: 0,
2008            failed: 0,
2009            skipped: 0,
2010            cost_usd: 0.0,
2011            elapsed_ms: started.elapsed().as_millis() as u64,
2012            backend_invoked: take_enrich_backend(),
2013        });
2014        return Ok(());
2015    }
2016
2017    // All operations in this enum have an execution path.
2018
2019    // Queue setup for resume/retry
2020    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
2021
2022    if args.resume {
2023        let reset = queue_conn
2024            .execute(
2025                "UPDATE queue SET status='pending' WHERE status='processing'",
2026                [],
2027            )
2028            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
2029        if reset > 0 {
2030            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
2031        }
2032    }
2033
2034    if args.retry_failed {
2035        let count = queue_conn
2036            .execute(
2037                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
2038                [],
2039            )
2040            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
2041        tracing::info!(target: "enrich", count, "retrying failed items");
2042    }
2043
2044    if !args.resume && !args.retry_failed {
2045        queue_conn
2046            .execute("DELETE FROM queue", [])
2047            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
2048    }
2049
2050    // Populate queue
2051    for (idx, key) in scan_result.iter().enumerate() {
2052        let item_type = match args.operation {
2053            EnrichOperation::EntityDescriptions => "entity",
2054            _ => "memory",
2055        };
2056        if let Err(e) = queue_conn.execute(
2057            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
2058            rusqlite::params![key, item_type],
2059        ) {
2060            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
2061        }
2062        let _ = idx; // suppress unused warning
2063    }
2064
2065    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
2066    // Clamp enforces the range even if the caller bypasses clap validation.
2067    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
2068    if parallelism > 1 {
2069        tracing::info!(
2070            target: "enrich",
2071            llm_parallelism = parallelism,
2072            "parallel LLM processing with bounded thread pool"
2073        );
2074    }
2075    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
2076    // ceiling. The threshold and message depend on the LLM mode because
2077    // Claude Code spawns MCP children (G28-A) while Codex does not.
2078    if parallelism > 4 {
2079        match args.mode {
2080            EnrichMode::ClaudeCode => {
2081                tracing::warn!(
2082                    target: "enrich",
2083                    llm_parallelism = parallelism,
2084                    recommended_max = 4,
2085                    mode = "claude-code",
2086                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
2087                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
2088                     to cut MCP children (G28-A)"
2089                );
2090            }
2091            EnrichMode::Codex if parallelism > 16 => {
2092                tracing::warn!(
2093                    target: "enrich",
2094                    llm_parallelism = parallelism,
2095                    recommended_max = 16,
2096                    mode = "codex",
2097                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
2098                     consider --llm-parallelism 8 for safer concurrency"
2099                );
2100            }
2101            EnrichMode::Codex => {
2102                // No warning: codex does not spawn MCP children and was
2103                // validated at parallelism 8 in production (1161 items,
2104                // 0 failures) per the 2026-06-04 session audit.
2105            }
2106            EnrichMode::Opencode if parallelism > 16 => {
2107                tracing::warn!(
2108                    target: "enrich",
2109                    llm_parallelism = parallelism,
2110                    recommended_max = 16,
2111                    mode = "opencode",
2112                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
2113                     consider --llm-parallelism 8 for safer concurrency"
2114                );
2115            }
2116            EnrichMode::Opencode => {
2117                // No warning: opencode does not spawn MCP children.
2118            }
2119            EnrichMode::OpenRouter => {
2120                // No warning: OpenRouter is a bounded HTTP fan-out (no
2121                // subprocess); --llm-parallelism is respected as-is.
2122            }
2123        }
2124    }
2125
2126    let mut completed = 0usize;
2127    let mut failed = 0usize;
2128    let mut skipped = 0usize;
2129    let mut cost_total = 0.0f64;
2130    let mut oauth_detected = false;
2131    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2132    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2133    let enrich_started = std::time::Instant::now();
2134
2135    let provider_timeout = match args.mode {
2136        EnrichMode::ClaudeCode => args.claude_timeout,
2137        EnrichMode::Codex => args.codex_timeout,
2138        EnrichMode::Opencode => args.opencode_timeout,
2139        EnrichMode::OpenRouter => args.openrouter_timeout,
2140    };
2141
2142    let provider_model: Option<&str> = match args.mode {
2143        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
2144        EnrichMode::Codex => args.codex_model.as_deref(),
2145        EnrichMode::Opencode => args.opencode_model.as_deref(),
2146        EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
2147    };
2148
2149    // G19: when parallelism > 1, spawn bounded worker threads.
2150    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
2151    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
2152    if parallelism > 1 {
2153        let stdout_mu = parking_lot::Mutex::new(());
2154        let budget = args.max_cost_usd;
2155        let operation = args.operation.clone();
2156        let mode = args.mode.clone();
2157        let min_oc = args.min_output_chars;
2158        let max_oc = args.max_output_chars;
2159        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2160
2161        struct WorkerResult {
2162            completed: usize,
2163            failed: usize,
2164            skipped: usize,
2165            cost: f64,
2166            oauth: bool,
2167        }
2168
2169        let results: Vec<WorkerResult> = std::thread::scope(|s| {
2170            let handles: Vec<_> = (0..parallelism)
2171                .map(|worker_id| {
2172                    let stdout_mu = &stdout_mu;
2173                    let paths = &paths;
2174                    let namespace = &namespace;
2175                    let provider_binary = provider_binary.as_deref();
2176                    let operation = &operation;
2177                    let mode = &mode;
2178                    let prompt_tpl = prompt_tpl.as_deref();
2179                    s.spawn(move || {
2180                        let w_conn = match open_rw(&paths.db) {
2181                            Ok(c) => c,
2182                            Err(e) => {
2183                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2184                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2185                            }
2186                        };
2187                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2188                            Ok(c) => c,
2189                            Err(e) => {
2190                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2191                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2192                            }
2193                        };
2194                        let mut w_completed = 0usize;
2195                        let mut w_failed = 0usize;
2196                        let mut w_skipped = 0usize;
2197                        let mut w_cost = 0.0f64;
2198                        let mut w_oauth = false;
2199                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2200                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2201                        // G28-D: per-worker circuit breaker that aborts the
2202                        // loop after `circuit_breaker_threshold` consecutive
2203                        // HardFailure outcomes (transient/rate-limited errors
2204                        // do NOT count, so a recovering provider is not
2205                        // penalised).
2206                        let mut w_breaker = crate::retry::CircuitBreaker::new(
2207                            args.circuit_breaker_threshold.max(1),
2208                            std::time::Duration::from_secs(60),
2209                        );
2210
2211                        loop {
2212                            if crate::shutdown_requested() {
2213                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2214                                break;
2215                            }
2216                            if let Some(b) = budget {
2217                                if !w_oauth && w_cost >= b {
2218                                    break;
2219                                }
2220                            }
2221                            let pending: Option<(i64, String, String)> = w_queue
2222                                .query_row(
2223                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2224                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2225                                     RETURNING id, item_key, item_type",
2226                                    [],
2227                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2228                                )
2229                                .ok();
2230                            let (queue_id, item_key, _item_type) = match pending {
2231                                Some(p) => p,
2232                                None => break,
2233                            };
2234                            let item_started = Instant::now();
2235                            let current_index = w_completed + w_failed + w_skipped;
2236
2237                            let call_result = match operation {
2238                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2239                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2240                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2241                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2242                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2243                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2244                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2245                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2246                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2247                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2248                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2249                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2250                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2251                            };
2252
2253                            match call_result {
2254                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2255                                    if is_oauth { w_oauth = true; }
2256                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2257                                    let _ = w_queue.execute(
2258                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2259                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2260                                    );
2261                                    w_completed += 1;
2262                                    if !is_oauth { w_cost += cost; }
2263                                    // G28-D: count success; resets breaker.
2264                                    let _ = w_breaker
2265                                        .record(crate::retry::AttemptOutcome::Success);
2266                                    let _guard = stdout_mu.lock();
2267                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2268                                }
2269                                Ok(EnrichItemResult::Skipped { reason }) => {
2270                                    w_skipped += 1;
2271                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2272                                    let _guard = stdout_mu.lock();
2273                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2274                                }
2275                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2276                                    // G29 Passo 4: worker mirror of the
2277                                    // serial path. Counted as a soft
2278                                    // skip so the queue surface shows
2279                                    // a quality issue rather than a
2280                                    // transport failure.
2281                                    w_skipped += 1;
2282                                    let reason = format!(
2283                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2284                                    );
2285                                    let _ = w_queue.execute(
2286                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2287                                        rusqlite::params![reason, queue_id],
2288                                    );
2289                                    let _guard = stdout_mu.lock();
2290                                    emit_json(&ItemEvent {
2291                                        item: &item_key,
2292                                        status: "preservation_failed",
2293                                        memory_id: None,
2294                                        entity_id: None,
2295                                        entities: None,
2296                                        rels: None,
2297                                        chars_before: Some(chars_before),
2298                                        chars_after: Some(chars_after),
2299                                        cost_usd: None,
2300                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2301                                        error: Some(reason),
2302                                        index: current_index,
2303                                        total,
2304                                    });
2305                                }
2306                                Err(e) => {
2307                                    let err_str = format!("{e}");
2308                                    if matches!(e, AppError::RateLimited { .. }) {
2309                                        if crate::retry::is_kill_switch_active() {
2310                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2311                                        } else if std::time::Instant::now() >= w_deadline {
2312                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2313                                        } else {
2314                                            let half = w_backoff / 2;
2315                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2316                                            let actual_wait = half + jitter;
2317                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2318                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2319                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2320                                            w_backoff = (w_backoff * 2).min(900);
2321                                            continue;
2322                                        }
2323                                    }
2324                                    w_failed += 1;
2325                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2326                                    let _guard = stdout_mu.lock();
2327                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2328                                    // G28-D: count hard failure against breaker.
2329                                    let breaker_opened = w_breaker
2330                                        .record(crate::retry::AttemptOutcome::HardFailure);
2331                                    if breaker_opened {
2332                                        tracing::error!(target: "enrich",
2333                                            consecutive_failures = w_breaker.consecutive_failures(),
2334                                            "circuit breaker opened — aborting worker"
2335                                        );
2336                                        break;
2337                                    }
2338                                }
2339                            }
2340                        }
2341                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2342                    })
2343                })
2344                .collect();
2345            handles
2346                .into_iter()
2347                .map(|h| {
2348                    h.join().unwrap_or(WorkerResult {
2349                        completed: 0,
2350                        failed: 0,
2351                        skipped: 0,
2352                        cost: 0.0,
2353                        oauth: false,
2354                    })
2355                })
2356                .collect()
2357        });
2358
2359        for r in &results {
2360            completed += r.completed;
2361            failed += r.failed;
2362            skipped += r.skipped;
2363            cost_total += r.cost;
2364            if r.oauth && !oauth_detected {
2365                oauth_detected = true;
2366            }
2367        }
2368    } else {
2369        // Serial path (parallelism == 1) — original loop
2370        loop {
2371            if crate::shutdown_requested() {
2372                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2373                break;
2374            }
2375
2376            // Budget check
2377            if let Some(budget) = args.max_cost_usd {
2378                if !oauth_detected && cost_total >= budget {
2379                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2380                    break;
2381                }
2382            }
2383
2384            // Dequeue next pending item
2385            let pending: Option<(i64, String, String)> = queue_conn
2386                .query_row(
2387                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2388                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2389                 RETURNING id, item_key, item_type",
2390                    [],
2391                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2392                )
2393                .ok();
2394
2395            let (queue_id, item_key, item_type) = match pending {
2396                Some(p) => p,
2397                None => break,
2398            };
2399
2400            let item_started = Instant::now();
2401            let current_index = completed + failed + skipped;
2402
2403            let call_result = match args.operation {
2404                EnrichOperation::MemoryBindings => call_memory_bindings(
2405                    &conn,
2406                    &namespace,
2407                    &item_key,
2408                    provider_binary
2409                        .as_deref()
2410                        .expect("provider binary required"),
2411                    provider_model,
2412                    provider_timeout,
2413                    &args.mode,
2414                ),
2415                EnrichOperation::EntityDescriptions => call_entity_description(
2416                    &conn,
2417                    &namespace,
2418                    &item_key,
2419                    provider_binary
2420                        .as_deref()
2421                        .expect("provider binary required"),
2422                    provider_model,
2423                    provider_timeout,
2424                    &args.mode,
2425                ),
2426                EnrichOperation::BodyEnrich => call_body_enrich(
2427                    &conn,
2428                    &namespace,
2429                    &item_key,
2430                    provider_binary
2431                        .as_deref()
2432                        .expect("provider binary required"),
2433                    provider_model,
2434                    provider_timeout,
2435                    &args.mode,
2436                    args.min_output_chars,
2437                    args.max_output_chars,
2438                    args.prompt_template.as_deref(),
2439                    args.preserve_threshold,
2440                    &paths,
2441                    llm_backend,
2442                    embedding_backend,
2443                ),
2444                EnrichOperation::ReEmbed => call_reembed(
2445                    &conn,
2446                    &namespace,
2447                    &item_key,
2448                    &paths,
2449                    llm_backend,
2450                    embedding_backend,
2451                ),
2452                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2453                    &conn,
2454                    &namespace,
2455                    &item_key,
2456                    provider_binary
2457                        .as_deref()
2458                        .expect("provider binary required"),
2459                    provider_model,
2460                    provider_timeout,
2461                    &args.mode,
2462                ),
2463                EnrichOperation::RelationReclassify => call_relation_reclassify(
2464                    &conn,
2465                    &namespace,
2466                    &item_key,
2467                    provider_binary
2468                        .as_deref()
2469                        .expect("provider binary required"),
2470                    provider_model,
2471                    provider_timeout,
2472                    &args.mode,
2473                ),
2474                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2475                    call_entity_connect(
2476                        &conn,
2477                        &namespace,
2478                        &item_key,
2479                        provider_binary
2480                            .as_deref()
2481                            .expect("provider binary required"),
2482                        provider_model,
2483                        provider_timeout,
2484                        &args.mode,
2485                    )
2486                }
2487                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2488                    &conn,
2489                    &namespace,
2490                    &item_key,
2491                    provider_binary
2492                        .as_deref()
2493                        .expect("provider binary required"),
2494                    provider_model,
2495                    provider_timeout,
2496                    &args.mode,
2497                ),
2498                EnrichOperation::DescriptionEnrich => call_description_enrich(
2499                    &conn,
2500                    &namespace,
2501                    &item_key,
2502                    provider_binary
2503                        .as_deref()
2504                        .expect("provider binary required"),
2505                    provider_model,
2506                    provider_timeout,
2507                    &args.mode,
2508                ),
2509                EnrichOperation::DomainClassify => call_domain_classify(
2510                    &conn,
2511                    &namespace,
2512                    &item_key,
2513                    provider_binary
2514                        .as_deref()
2515                        .expect("provider binary required"),
2516                    provider_model,
2517                    provider_timeout,
2518                    &args.mode,
2519                ),
2520                EnrichOperation::GraphAudit => call_graph_audit(
2521                    &conn,
2522                    &namespace,
2523                    &item_key,
2524                    provider_binary
2525                        .as_deref()
2526                        .expect("provider binary required"),
2527                    provider_model,
2528                    provider_timeout,
2529                    &args.mode,
2530                ),
2531                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2532                    &conn,
2533                    &namespace,
2534                    &item_key,
2535                    provider_binary
2536                        .as_deref()
2537                        .expect("provider binary required"),
2538                    provider_model,
2539                    provider_timeout,
2540                    &args.mode,
2541                ),
2542                EnrichOperation::BodyExtract => call_body_extract(
2543                    &conn,
2544                    &namespace,
2545                    &item_key,
2546                    provider_binary
2547                        .as_deref()
2548                        .expect("provider binary required"),
2549                    provider_model,
2550                    provider_timeout,
2551                    &args.mode,
2552                ),
2553            };
2554
2555            match call_result {
2556                Ok(EnrichItemResult::Done {
2557                    memory_id,
2558                    entity_id,
2559                    entities,
2560                    rels,
2561                    chars_before,
2562                    chars_after,
2563                    cost,
2564                    is_oauth,
2565                }) => {
2566                    if is_oauth && !oauth_detected {
2567                        oauth_detected = true;
2568                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2569                    }
2570                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2571
2572                    // Persist depends on the operation
2573                    let persist_err: Option<String> = match args.operation {
2574                        EnrichOperation::MemoryBindings => {
2575                            // Bindings already persisted inside call_memory_bindings
2576                            None
2577                        }
2578                        EnrichOperation::EntityDescriptions => {
2579                            // Description already persisted inside call_entity_description
2580                            None
2581                        }
2582                        EnrichOperation::BodyEnrich => {
2583                            // Body already persisted inside call_body_enrich
2584                            None
2585                        }
2586                        _ => {
2587                            // All G27 operations persist inside their call_* function
2588                            None
2589                        }
2590                    };
2591
2592                    if let Err(e) = queue_conn.execute(
2593                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2594                    rusqlite::params![
2595                        memory_id,
2596                        entity_id,
2597                        entities as i64,
2598                        rels as i64,
2599                        cost,
2600                        item_started.elapsed().as_millis() as i64,
2601                        queue_id
2602                    ],
2603                ) {
2604                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2605                    }
2606
2607                    if persist_err.is_none() {
2608                        completed += 1;
2609                        if !is_oauth {
2610                            cost_total += cost;
2611                        }
2612                        emit_json(&ItemEvent {
2613                            item: &item_key,
2614                            status: "done",
2615                            memory_id,
2616                            entity_id,
2617                            entities: Some(entities),
2618                            rels: Some(rels),
2619                            chars_before,
2620                            chars_after,
2621                            cost_usd: if is_oauth { None } else { Some(cost) },
2622                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2623                            error: None,
2624                            index: current_index,
2625                            total,
2626                        });
2627                    } else {
2628                        failed += 1;
2629                        emit_json(&ItemEvent {
2630                            item: &item_key,
2631                            status: "failed",
2632                            memory_id: None,
2633                            entity_id: None,
2634                            entities: None,
2635                            rels: None,
2636                            chars_before: None,
2637                            chars_after: None,
2638                            cost_usd: None,
2639                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2640                            error: persist_err,
2641                            index: current_index,
2642                            total,
2643                        });
2644                    }
2645                }
2646                Ok(EnrichItemResult::Skipped { reason }) => {
2647                    skipped += 1;
2648                    if let Err(e) = queue_conn.execute(
2649                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2650                    rusqlite::params![reason, queue_id],
2651                ) {
2652                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2653                    }
2654                    emit_json(&ItemEvent {
2655                        item: &item_key,
2656                        status: "skipped",
2657                        memory_id: None,
2658                        entity_id: None,
2659                        entities: None,
2660                        rels: None,
2661                        chars_before: None,
2662                        chars_after: None,
2663                        cost_usd: None,
2664                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2665                        error: None,
2666                        index: current_index,
2667                        total,
2668                    });
2669                }
2670                Ok(EnrichItemResult::PreservationFailed {
2671                    score,
2672                    threshold,
2673                    chars_before,
2674                    chars_after,
2675                }) => {
2676                    // G29 Passo 4: the LLM rewrite diverged too far from
2677                    // the original body. Count as a soft failure (not
2678                    // `failed`) so the queue surfaces it as a quality
2679                    // issue, not a transport error. The reason is
2680                    // structured so the operator can audit why a body
2681                    // was rejected.
2682                    skipped += 1;
2683                    let reason = format!(
2684                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2685                    );
2686                    if let Err(qe) = queue_conn.execute(
2687                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2688                        rusqlite::params![reason, queue_id],
2689                    ) {
2690                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2691                    }
2692                    emit_json(&ItemEvent {
2693                        item: &item_key,
2694                        status: "preservation_failed",
2695                        memory_id: None,
2696                        entity_id: None,
2697                        entities: None,
2698                        rels: None,
2699                        chars_before: Some(chars_before),
2700                        chars_after: Some(chars_after),
2701                        cost_usd: None,
2702                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2703                        error: Some(reason),
2704                        index: current_index,
2705                        total,
2706                    });
2707                }
2708                Err(e) => {
2709                    let err_str = format!("{e}");
2710                    if matches!(e, AppError::RateLimited { .. }) {
2711                        if crate::retry::is_kill_switch_active() {
2712                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2713                        } else if std::time::Instant::now() >= rate_limit_deadline {
2714                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2715                        } else {
2716                            let half = backoff_secs / 2;
2717                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2718                            let actual_wait = half + jitter;
2719                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2720                            if let Err(qe) = queue_conn.execute(
2721                                "UPDATE queue SET status='pending' WHERE id=?1",
2722                                rusqlite::params![queue_id],
2723                            ) {
2724                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2725                            }
2726                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2727                            backoff_secs = (backoff_secs * 2).min(900);
2728                            continue;
2729                        }
2730                    }
2731
2732                    failed += 1;
2733                    if let Err(qe) = queue_conn.execute(
2734                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2735                    rusqlite::params![err_str, queue_id],
2736                ) {
2737                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2738                    }
2739                    emit_json(&ItemEvent {
2740                        item: &item_key,
2741                        status: "failed",
2742                        memory_id: None,
2743                        entity_id: None,
2744                        entities: None,
2745                        rels: None,
2746                        chars_before: None,
2747                        chars_after: None,
2748                        cost_usd: None,
2749                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2750                        error: Some(err_str),
2751                        index: current_index,
2752                        total,
2753                    });
2754                }
2755            }
2756
2757            let _ = item_type; // used via queue schema only
2758        }
2759    } // end else (serial path)
2760
2761    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2762    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2763
2764    emit_json(&EnrichSummary {
2765        summary: true,
2766        operation: format!("{:?}", args.operation),
2767        items_total: total,
2768        completed,
2769        failed,
2770        skipped,
2771        cost_usd: cost_total,
2772        elapsed_ms: started.elapsed().as_millis() as u64,
2773        backend_invoked: take_enrich_backend(),
2774    });
2775
2776    if failed == 0 {
2777        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2778    }
2779
2780    Ok(())
2781}
2782
2783// ---------------------------------------------------------------------------
2784// Internal result type for a single item call
2785// ---------------------------------------------------------------------------
2786
2787enum EnrichItemResult {
2788    Done {
2789        memory_id: Option<i64>,
2790        entity_id: Option<i64>,
2791        entities: usize,
2792        rels: usize,
2793        chars_before: Option<usize>,
2794        chars_after: Option<usize>,
2795        cost: f64,
2796        is_oauth: bool,
2797    },
2798    Skipped {
2799        reason: String,
2800    },
2801    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2802    /// body beyond the configured `--preserve-threshold` and was rejected
2803    /// before persistence. The trigram-Jaccard score and threshold are
2804    /// emitted in the NDJSON stream for operator audit.
2805    PreservationFailed {
2806        score: f64,
2807        threshold: f64,
2808        chars_before: usize,
2809        chars_after: usize,
2810    },
2811}
2812
2813// ---------------------------------------------------------------------------
2814// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2815// ---------------------------------------------------------------------------
2816
2817fn call_memory_bindings(
2818    conn: &Connection,
2819    namespace: &str,
2820    memory_name: &str,
2821    binary: &Path,
2822    model: Option<&str>,
2823    timeout: u64,
2824    mode: &EnrichMode,
2825) -> Result<EnrichItemResult, AppError> {
2826    // Look up the memory
2827    let (memory_id, body): (i64, String) = conn.query_row(
2828        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2829        rusqlite::params![namespace, memory_name],
2830        |r| Ok((r.get(0)?, r.get(1)?)),
2831    ).map_err(|e| match e {
2832        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2833        other => AppError::Database(other),
2834    })?;
2835
2836    if body.trim().is_empty() {
2837        return Ok(EnrichItemResult::Skipped {
2838            reason: "body is empty".to_string(),
2839        });
2840    }
2841
2842    let (value, cost, is_oauth) = match mode {
2843        EnrichMode::ClaudeCode => call_claude(
2844            binary,
2845            BINDINGS_PROMPT,
2846            BINDINGS_SCHEMA,
2847            &body,
2848            model,
2849            timeout,
2850        )?,
2851        EnrichMode::Codex => call_codex(
2852            binary,
2853            BINDINGS_PROMPT,
2854            BINDINGS_SCHEMA,
2855            &body,
2856            model,
2857            timeout,
2858        )?,
2859        EnrichMode::Opencode => call_opencode(
2860            binary,
2861            BINDINGS_PROMPT,
2862            BINDINGS_SCHEMA,
2863            &body,
2864            model,
2865            timeout,
2866        )?,
2867        EnrichMode::OpenRouter => {
2868            call_openrouter(BINDINGS_PROMPT, BINDINGS_SCHEMA, &body, model, timeout)?
2869        }
2870    };
2871
2872    let empty_arr = serde_json::Value::Array(vec![]);
2873    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2874    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2875
2876    let (ent_count, rel_count) =
2877        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2878
2879    Ok(EnrichItemResult::Done {
2880        memory_id: Some(memory_id),
2881        entity_id: None,
2882        entities: ent_count,
2883        rels: rel_count,
2884        chars_before: None,
2885        chars_after: None,
2886        cost,
2887        is_oauth,
2888    })
2889}
2890
2891fn call_entity_description(
2892    conn: &Connection,
2893    namespace: &str,
2894    entity_name: &str,
2895    binary: &Path,
2896    model: Option<&str>,
2897    timeout: u64,
2898    mode: &EnrichMode,
2899) -> Result<EnrichItemResult, AppError> {
2900    let (entity_id, entity_type): (i64, String) = conn
2901        .query_row(
2902            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2903            rusqlite::params![namespace, entity_name],
2904            |r| Ok((r.get(0)?, r.get(1)?)),
2905        )
2906        .map_err(|e| match e {
2907            rusqlite::Error::QueryReturnedNoRows => {
2908                AppError::NotFound(format!("entity '{entity_name}' not found"))
2909            }
2910            other => AppError::Database(other),
2911        })?;
2912
2913    let prompt = format!(
2914        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2915    );
2916
2917    let (value, cost, is_oauth) = match mode {
2918        EnrichMode::ClaudeCode => call_claude(
2919            binary,
2920            &prompt,
2921            ENTITY_DESCRIPTION_SCHEMA,
2922            "",
2923            model,
2924            timeout,
2925        )?,
2926        EnrichMode::Codex => call_codex(
2927            binary,
2928            &prompt,
2929            ENTITY_DESCRIPTION_SCHEMA,
2930            "",
2931            model,
2932            timeout,
2933        )?,
2934        EnrichMode::Opencode => call_opencode(
2935            binary,
2936            &prompt,
2937            ENTITY_DESCRIPTION_SCHEMA,
2938            "",
2939            model,
2940            timeout,
2941        )?,
2942        EnrichMode::OpenRouter => {
2943            call_openrouter(&prompt, ENTITY_DESCRIPTION_SCHEMA, "", model, timeout)?
2944        }
2945    };
2946
2947    let description = value
2948        .get("description")
2949        .and_then(|v| v.as_str())
2950        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2951
2952    persist_entity_description(conn, entity_id, description)?;
2953
2954    Ok(EnrichItemResult::Done {
2955        memory_id: None,
2956        entity_id: Some(entity_id),
2957        entities: 0,
2958        rels: 0,
2959        chars_before: None,
2960        chars_after: None,
2961        cost,
2962        is_oauth,
2963    })
2964}
2965
2966#[allow(clippy::too_many_arguments)]
2967fn call_body_enrich(
2968    conn: &Connection,
2969    namespace: &str,
2970    memory_name: &str,
2971    binary: &Path,
2972    model: Option<&str>,
2973    timeout: u64,
2974    mode: &EnrichMode,
2975    min_output_chars: usize,
2976    max_output_chars: usize,
2977    prompt_template: Option<&Path>,
2978    preserve_threshold: f64,
2979    paths: &crate::paths::AppPaths,
2980    llm_backend: crate::cli::LlmBackendChoice,
2981    embedding_backend: crate::cli::EmbeddingBackendChoice,
2982) -> Result<EnrichItemResult, AppError> {
2983    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2984        .query_row(
2985            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2986         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2987            rusqlite::params![namespace, memory_name],
2988            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2989        )
2990        .map_err(|e| match e {
2991            rusqlite::Error::QueryReturnedNoRows => {
2992                AppError::NotFound(format!("memory '{memory_name}' not found"))
2993            }
2994            other => AppError::Database(other),
2995        })?;
2996
2997    let chars_before = body.chars().count();
2998
2999    // G26: gather graph context for contextualized enrichment
3000    let linked_entities: Vec<String> = {
3001        let mut stmt = conn.prepare_cached(
3002            "SELECT e.name FROM memory_entities me \
3003             JOIN entities e ON e.id = me.entity_id \
3004             WHERE me.memory_id = ?1 LIMIT 10",
3005        )?;
3006        let result: Vec<String> = stmt
3007            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
3008            .filter_map(|r| r.ok())
3009            .collect();
3010        drop(stmt);
3011        result
3012    };
3013
3014    // Load custom prompt template if provided
3015    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
3016        let file_size = std::fs::metadata(tmpl_path)
3017            .map_err(|e| {
3018                AppError::Io(std::io::Error::new(
3019                    e.kind(),
3020                    format!("failed to stat prompt template: {e}"),
3021                ))
3022            })?
3023            .len();
3024        if file_size > MAX_MEMORY_BODY_LEN as u64 {
3025            return Err(AppError::LimitExceeded(
3026                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
3027            ));
3028        }
3029        std::fs::read_to_string(tmpl_path).map_err(|e| {
3030            AppError::Io(std::io::Error::new(
3031                e.kind(),
3032                format!("failed to read prompt template: {e}"),
3033            ))
3034        })?
3035    } else {
3036        BODY_ENRICH_PROMPT_PREFIX.to_string()
3037    };
3038
3039    // G26: build contextualized prompt with graph data
3040    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
3041        let mut ctx = String::new();
3042        ctx.push_str(&format!(
3043            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
3044        ));
3045        if !description.is_empty() {
3046            ctx.push_str(&format!("- Description: {description}\n"));
3047        }
3048        ctx.push_str(&format!("- Domain: {namespace}\n"));
3049        if !linked_entities.is_empty() {
3050            ctx.push_str(&format!(
3051                "- Linked entities: {}\n",
3052                linked_entities.join(", ")
3053            ));
3054        }
3055        ctx
3056    } else {
3057        String::new()
3058    };
3059
3060    let prompt = format!(
3061        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
3062    );
3063
3064    // The body schema uses a free-form enriched_body field
3065    let (value, cost, is_oauth) = match mode {
3066        EnrichMode::ClaudeCode => {
3067            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3068        }
3069        EnrichMode::Codex => {
3070            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3071        }
3072        EnrichMode::Opencode => {
3073            call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3074        }
3075        EnrichMode::OpenRouter => {
3076            call_openrouter(&prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3077        }
3078    };
3079
3080    let enriched_body = value
3081        .get("enriched_body")
3082        .and_then(|v| v.as_str())
3083        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
3084
3085    let chars_after = enriched_body.chars().count();
3086
3087    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
3088    // a trigram-Jaccard similarity between the original body and the
3089    // LLM-rewritten body. When the score falls below
3090    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
3091    // rewrite as a likely hallucination. The result is recorded in the
3092    // NDJSON stream so operators can audit what the LLM tried to do.
3093    let threshold = preserve_threshold;
3094    let verdict =
3095        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
3096    if !verdict.is_accepted() {
3097        return Ok(EnrichItemResult::PreservationFailed {
3098            score: match verdict {
3099                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
3100                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
3101                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
3102            },
3103            threshold,
3104            chars_before,
3105            chars_after,
3106        });
3107    }
3108
3109    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
3110    // compare the hash of the original body against the hash of the enriched
3111    // body. Identical hashes mean the LLM produced a byte-for-byte identical
3112    // body (rare but possible) — treat as `Skipped` so re-running the batch
3113    // is safe and the queue does not get re-persisted entries.
3114    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
3115    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
3116    if old_hash == new_hash {
3117        return Ok(EnrichItemResult::Skipped {
3118            reason: format!(
3119                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
3120            ),
3121        });
3122    }
3123
3124    // Only persist if the enriched body is genuinely longer
3125    if chars_after <= chars_before {
3126        return Ok(EnrichItemResult::Skipped {
3127            reason: format!(
3128                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
3129            ),
3130        });
3131    }
3132
3133    persist_enriched_body(
3134        conn,
3135        namespace,
3136        memory_id,
3137        memory_name,
3138        enriched_body,
3139        paths,
3140        llm_backend,
3141        embedding_backend,
3142    )?;
3143
3144    Ok(EnrichItemResult::Done {
3145        memory_id: Some(memory_id),
3146        entity_id: None,
3147        entities: 0,
3148        rels: 0,
3149        chars_before: Some(chars_before),
3150        chars_after: Some(chars_after),
3151        cost,
3152        is_oauth,
3153    })
3154}
3155
3156fn call_reembed(
3157    conn: &Connection,
3158    namespace: &str,
3159    memory_name: &str,
3160    paths: &crate::paths::AppPaths,
3161    llm_backend: crate::cli::LlmBackendChoice,
3162    embedding_backend: crate::cli::EmbeddingBackendChoice,
3163) -> Result<EnrichItemResult, AppError> {
3164    let (memory_id, body, memory_type): (i64, String, String) = conn
3165        .query_row(
3166            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
3167             FROM memories
3168             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3169            rusqlite::params![namespace, memory_name],
3170            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3171        )
3172        .map_err(|e| match e {
3173            rusqlite::Error::QueryReturnedNoRows => {
3174                AppError::NotFound(format!("memory '{memory_name}' not found"))
3175            }
3176            other => AppError::Database(other),
3177        })?;
3178
3179    if body.trim().is_empty() {
3180        return Ok(EnrichItemResult::Skipped {
3181            reason: "body is empty".to_string(),
3182        });
3183    }
3184
3185    reembed_memory_vector(
3186        conn,
3187        namespace,
3188        memory_id,
3189        memory_name,
3190        &memory_type,
3191        &body,
3192        paths,
3193        llm_backend,
3194        embedding_backend,
3195    )?;
3196
3197    Ok(EnrichItemResult::Done {
3198        memory_id: Some(memory_id),
3199        entity_id: None,
3200        entities: 0,
3201        rels: 0,
3202        chars_before: Some(body.chars().count()),
3203        chars_after: Some(body.chars().count()),
3204        cost: 0.0,
3205        is_oauth: true,
3206    })
3207}
3208
3209// ---------------------------------------------------------------------------
3210// Scan dispatcher — maps operation to scan query result (item keys)
3211// ---------------------------------------------------------------------------
3212
3213fn scan_operation(
3214    conn: &Connection,
3215    namespace: &str,
3216    args: &EnrichArgs,
3217) -> Result<Vec<String>, AppError> {
3218    // G37: resolve --names + --names-file once and apply to every scan path.
3219    let name_filter = resolve_name_filter(args)?;
3220    match args.operation {
3221        EnrichOperation::MemoryBindings => {
3222            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3223            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3224        }
3225        EnrichOperation::EntityDescriptions => {
3226            let rows =
3227                scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3228            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3229        }
3230        EnrichOperation::BodyEnrich => {
3231            let rows = scan_short_body_memories(
3232                conn,
3233                namespace,
3234                args.min_output_chars,
3235                args.limit,
3236                &name_filter,
3237            )?;
3238            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3239        }
3240        EnrichOperation::ReEmbed => {
3241            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3242            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3243        }
3244        EnrichOperation::WeightCalibrate => {
3245            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3246            Ok(rows
3247                .into_iter()
3248                .map(|(id, _, _, _, _)| id.to_string())
3249                .collect())
3250        }
3251        EnrichOperation::RelationReclassify => {
3252            let rows = scan_generic_relations(conn, namespace, args.limit)?;
3253            Ok(rows
3254                .into_iter()
3255                .map(|(id, _, _, _)| id.to_string())
3256                .collect())
3257        }
3258        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3259            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3260            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3261        }
3262        EnrichOperation::EntityTypeValidate => {
3263            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3264            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3265        }
3266        EnrichOperation::DescriptionEnrich => {
3267            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3268            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3269        }
3270        EnrichOperation::DomainClassify
3271        | EnrichOperation::GraphAudit
3272        | EnrichOperation::DeepResearchSynth
3273        | EnrichOperation::BodyExtract => {
3274            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3275            let sql = format!(
3276                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3277            );
3278            let mut stmt = conn.prepare(&sql)?;
3279            let names = stmt
3280                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3281                .collect::<Result<Vec<_>, _>>()?;
3282            Ok(names)
3283        }
3284    }
3285}
3286
3287// ---------------------------------------------------------------------------
3288// Codex stub provider
3289// ---------------------------------------------------------------------------
3290
3291/// Locates the Codex CLI binary.
3292fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3293    if let Some(p) = explicit {
3294        if p.exists() {
3295            return Ok(p.to_path_buf());
3296        }
3297        return Err(AppError::Validation(format!(
3298            "Codex binary not found at explicit path: {}",
3299            p.display()
3300        )));
3301    }
3302
3303    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3304        let p = PathBuf::from(&env_path);
3305        if p.exists() {
3306            return Ok(p);
3307        }
3308    }
3309
3310    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3311    if let Some(path_var) = std::env::var_os("PATH") {
3312        for dir in std::env::split_paths(&path_var) {
3313            let candidate = dir.join(name);
3314            if candidate.exists() {
3315                return Ok(crate::extract::llm_embedding::resolve_real_binary(
3316                    &candidate,
3317                ));
3318            }
3319        }
3320    }
3321
3322    Err(AppError::Validation(
3323        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3324    ))
3325}
3326
3327/// G27: Calibrate weight of a single relationship via LLM.
3328fn call_weight_calibrate(
3329    conn: &Connection,
3330    _namespace: &str,
3331    item_key: &str,
3332    binary: &Path,
3333    model: Option<&str>,
3334    timeout: u64,
3335    mode: &EnrichMode,
3336) -> Result<EnrichItemResult, AppError> {
3337    let rel_id: i64 = item_key
3338        .parse()
3339        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3340    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3341        .query_row(
3342            "SELECT e1.name, e2.name, r.relation, r.weight \
3343             FROM relationships r \
3344             JOIN entities e1 ON e1.id = r.source_id \
3345             JOIN entities e2 ON e2.id = r.target_id \
3346             WHERE r.id = ?1",
3347            rusqlite::params![rel_id],
3348            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3349        )
3350        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3351
3352    let input_text = format!(
3353        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3354    );
3355    let (value, cost, is_oauth) = match mode {
3356        EnrichMode::ClaudeCode => call_claude(
3357            binary,
3358            WEIGHT_CALIBRATE_PROMPT,
3359            WEIGHT_CALIBRATE_SCHEMA,
3360            &input_text,
3361            model,
3362            timeout,
3363        )?,
3364        EnrichMode::Codex => call_codex(
3365            binary,
3366            WEIGHT_CALIBRATE_PROMPT,
3367            WEIGHT_CALIBRATE_SCHEMA,
3368            &input_text,
3369            model,
3370            timeout,
3371        )?,
3372        EnrichMode::Opencode => call_opencode(
3373            binary,
3374            WEIGHT_CALIBRATE_PROMPT,
3375            WEIGHT_CALIBRATE_SCHEMA,
3376            &input_text,
3377            model,
3378            timeout,
3379        )?,
3380        EnrichMode::OpenRouter => call_openrouter(
3381            WEIGHT_CALIBRATE_PROMPT,
3382            WEIGHT_CALIBRATE_SCHEMA,
3383            &input_text,
3384            model,
3385            timeout,
3386        )?,
3387    };
3388
3389    let calibrated = value
3390        .get("calibrated_weight")
3391        .and_then(|v| v.as_f64())
3392        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3393
3394    conn.execute(
3395        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3396        rusqlite::params![calibrated, rel_id],
3397    )?;
3398
3399    Ok(EnrichItemResult::Done {
3400        memory_id: None,
3401        entity_id: None,
3402        entities: 0,
3403        rels: 1,
3404        chars_before: None,
3405        chars_after: None,
3406        cost,
3407        is_oauth,
3408    })
3409}
3410
3411/// G27: Reclassify a generic relationship type via LLM.
3412fn call_relation_reclassify(
3413    conn: &Connection,
3414    _namespace: &str,
3415    item_key: &str,
3416    binary: &Path,
3417    model: Option<&str>,
3418    timeout: u64,
3419    mode: &EnrichMode,
3420) -> Result<EnrichItemResult, AppError> {
3421    let rel_id: i64 = item_key
3422        .parse()
3423        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3424    let (source_name, target_name, current_relation): (String, String, String) = conn
3425        .query_row(
3426            "SELECT e1.name, e2.name, r.relation \
3427             FROM relationships r \
3428             JOIN entities e1 ON e1.id = r.source_id \
3429             JOIN entities e2 ON e2.id = r.target_id \
3430             WHERE r.id = ?1",
3431            rusqlite::params![rel_id],
3432            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3433        )
3434        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3435
3436    let input_text = format!(
3437        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3438    );
3439    let (value, cost, is_oauth) = match mode {
3440        EnrichMode::ClaudeCode => call_claude(
3441            binary,
3442            RELATION_RECLASSIFY_PROMPT,
3443            RELATION_RECLASSIFY_SCHEMA,
3444            &input_text,
3445            model,
3446            timeout,
3447        )?,
3448        EnrichMode::Codex => call_codex(
3449            binary,
3450            RELATION_RECLASSIFY_PROMPT,
3451            RELATION_RECLASSIFY_SCHEMA,
3452            &input_text,
3453            model,
3454            timeout,
3455        )?,
3456        EnrichMode::Opencode => call_opencode(
3457            binary,
3458            RELATION_RECLASSIFY_PROMPT,
3459            RELATION_RECLASSIFY_SCHEMA,
3460            &input_text,
3461            model,
3462            timeout,
3463        )?,
3464        EnrichMode::OpenRouter => call_openrouter(
3465            RELATION_RECLASSIFY_PROMPT,
3466            RELATION_RECLASSIFY_SCHEMA,
3467            &input_text,
3468            model,
3469            timeout,
3470        )?,
3471    };
3472
3473    let new_relation = value
3474        .get("relation")
3475        .and_then(|v| v.as_str())
3476        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3477    let new_strength = value
3478        .get("strength")
3479        .and_then(|v| v.as_f64())
3480        .unwrap_or(0.5);
3481
3482    conn.execute(
3483        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3484        rusqlite::params![new_relation, new_strength, rel_id],
3485    )?;
3486
3487    Ok(EnrichItemResult::Done {
3488        memory_id: None,
3489        entity_id: None,
3490        entities: 0,
3491        rels: 1,
3492        chars_before: None,
3493        chars_after: None,
3494        cost,
3495        is_oauth,
3496    })
3497}
3498
3499/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3500fn call_entity_connect(
3501    conn: &Connection,
3502    namespace: &str,
3503    item_key: &str,
3504    binary: &Path,
3505    model: Option<&str>,
3506    timeout: u64,
3507    mode: &EnrichMode,
3508) -> Result<EnrichItemResult, AppError> {
3509    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3510    let (e1_id, e1_name, e2_id, e2_name) =
3511        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3512            Some(p) => p,
3513            None => {
3514                return Ok(EnrichItemResult::Skipped {
3515                    reason: "pair no longer isolated".into(),
3516                })
3517            }
3518        };
3519    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3520    let (value, cost, is_oauth) = match mode {
3521        EnrichMode::ClaudeCode => call_claude(
3522            binary,
3523            ENTITY_CONNECT_PROMPT,
3524            ENTITY_CONNECT_SCHEMA,
3525            &input_text,
3526            model,
3527            timeout,
3528        )?,
3529        EnrichMode::Codex => call_codex(
3530            binary,
3531            ENTITY_CONNECT_PROMPT,
3532            ENTITY_CONNECT_SCHEMA,
3533            &input_text,
3534            model,
3535            timeout,
3536        )?,
3537        EnrichMode::Opencode => call_opencode(
3538            binary,
3539            ENTITY_CONNECT_PROMPT,
3540            ENTITY_CONNECT_SCHEMA,
3541            &input_text,
3542            model,
3543            timeout,
3544        )?,
3545        EnrichMode::OpenRouter => call_openrouter(
3546            ENTITY_CONNECT_PROMPT,
3547            ENTITY_CONNECT_SCHEMA,
3548            &input_text,
3549            model,
3550            timeout,
3551        )?,
3552    };
3553    let relation = value
3554        .get("relation")
3555        .and_then(|v| v.as_str())
3556        .unwrap_or("none");
3557    if relation == "none" {
3558        return Ok(EnrichItemResult::Skipped {
3559            reason: "LLM determined no relationship".into(),
3560        });
3561    }
3562    let strength = value
3563        .get("strength")
3564        .and_then(|v| v.as_f64())
3565        .unwrap_or(0.5);
3566    conn.execute(
3567        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3568        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3569    )?;
3570    Ok(EnrichItemResult::Done {
3571        memory_id: None,
3572        entity_id: None,
3573        entities: 0,
3574        rels: 1,
3575        chars_before: None,
3576        chars_after: None,
3577        cost,
3578        is_oauth,
3579    })
3580}
3581
3582/// G27 P2: Validate entity type assignment via LLM.
3583fn call_entity_type_validate(
3584    conn: &Connection,
3585    _namespace: &str,
3586    item_key: &str,
3587    binary: &Path,
3588    model: Option<&str>,
3589    timeout: u64,
3590    mode: &EnrichMode,
3591) -> Result<EnrichItemResult, AppError> {
3592    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3593        .query_row(
3594            "SELECT id, name, type FROM entities WHERE name = ?1",
3595            rusqlite::params![item_key],
3596            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3597        )
3598        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3599    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3600    let (value, cost, is_oauth) = match mode {
3601        EnrichMode::ClaudeCode => call_claude(
3602            binary,
3603            ENTITY_TYPE_VALIDATE_PROMPT,
3604            ENTITY_TYPE_VALIDATE_SCHEMA,
3605            &input_text,
3606            model,
3607            timeout,
3608        )?,
3609        EnrichMode::Codex => call_codex(
3610            binary,
3611            ENTITY_TYPE_VALIDATE_PROMPT,
3612            ENTITY_TYPE_VALIDATE_SCHEMA,
3613            &input_text,
3614            model,
3615            timeout,
3616        )?,
3617        EnrichMode::Opencode => call_opencode(
3618            binary,
3619            ENTITY_TYPE_VALIDATE_PROMPT,
3620            ENTITY_TYPE_VALIDATE_SCHEMA,
3621            &input_text,
3622            model,
3623            timeout,
3624        )?,
3625        EnrichMode::OpenRouter => call_openrouter(
3626            ENTITY_TYPE_VALIDATE_PROMPT,
3627            ENTITY_TYPE_VALIDATE_SCHEMA,
3628            &input_text,
3629            model,
3630            timeout,
3631        )?,
3632    };
3633    let validated_type = value
3634        .get("validated_type")
3635        .and_then(|v| v.as_str())
3636        .unwrap_or(&ent_type);
3637    let was_correct = value
3638        .get("was_correct")
3639        .and_then(|v| v.as_bool())
3640        .unwrap_or(true);
3641    if !was_correct {
3642        conn.execute(
3643            "UPDATE entities SET type = ?1 WHERE id = ?2",
3644            rusqlite::params![validated_type, ent_id],
3645        )?;
3646    }
3647    Ok(EnrichItemResult::Done {
3648        memory_id: None,
3649        entity_id: Some(ent_id),
3650        entities: 1,
3651        rels: 0,
3652        chars_before: None,
3653        chars_after: None,
3654        cost,
3655        is_oauth,
3656    })
3657}
3658
3659/// G27 P2: Enrich generic memory description via LLM.
3660fn call_description_enrich(
3661    conn: &Connection,
3662    _namespace: &str,
3663    item_key: &str,
3664    binary: &Path,
3665    model: Option<&str>,
3666    timeout: u64,
3667    mode: &EnrichMode,
3668) -> Result<EnrichItemResult, AppError> {
3669    let (mem_id, body, old_desc): (i64, String, String) = conn
3670        .query_row(
3671            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3672            rusqlite::params![item_key],
3673            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3674        )
3675        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3676    let snippet: String = body.chars().take(500).collect();
3677    let input_text = format!(
3678        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3679    );
3680    let (value, cost, is_oauth) = match mode {
3681        EnrichMode::ClaudeCode => call_claude(
3682            binary,
3683            DESCRIPTION_ENRICH_PROMPT,
3684            DESCRIPTION_ENRICH_SCHEMA,
3685            &input_text,
3686            model,
3687            timeout,
3688        )?,
3689        EnrichMode::Codex => call_codex(
3690            binary,
3691            DESCRIPTION_ENRICH_PROMPT,
3692            DESCRIPTION_ENRICH_SCHEMA,
3693            &input_text,
3694            model,
3695            timeout,
3696        )?,
3697        EnrichMode::Opencode => call_opencode(
3698            binary,
3699            DESCRIPTION_ENRICH_PROMPT,
3700            DESCRIPTION_ENRICH_SCHEMA,
3701            &input_text,
3702            model,
3703            timeout,
3704        )?,
3705        EnrichMode::OpenRouter => call_openrouter(
3706            DESCRIPTION_ENRICH_PROMPT,
3707            DESCRIPTION_ENRICH_SCHEMA,
3708            &input_text,
3709            model,
3710            timeout,
3711        )?,
3712    };
3713    let new_desc = value
3714        .get("description")
3715        .and_then(|v| v.as_str())
3716        .unwrap_or(&old_desc);
3717    let old_name: String = conn.query_row(
3718        "SELECT name FROM memories WHERE id = ?1",
3719        rusqlite::params![mem_id],
3720        |r| r.get(0),
3721    )?;
3722    conn.execute(
3723        "UPDATE memories SET description = ?1 WHERE id = ?2",
3724        rusqlite::params![new_desc, mem_id],
3725    )?;
3726    memories::sync_fts_after_update(
3727        conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3728    )?;
3729    Ok(EnrichItemResult::Done {
3730        memory_id: Some(mem_id),
3731        entity_id: None,
3732        entities: 0,
3733        rels: 0,
3734        chars_before: Some(old_desc.len()),
3735        chars_after: Some(new_desc.len()),
3736        cost,
3737        is_oauth,
3738    })
3739}
3740
3741/// G27 P2: Classify memory into domain category via LLM.
3742fn call_domain_classify(
3743    conn: &Connection,
3744    _namespace: &str,
3745    item_key: &str,
3746    binary: &Path,
3747    model: Option<&str>,
3748    timeout: u64,
3749    mode: &EnrichMode,
3750) -> Result<EnrichItemResult, AppError> {
3751    let (mem_id, body, desc): (i64, String, String) = conn
3752        .query_row(
3753            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3754            rusqlite::params![item_key],
3755            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3756        )
3757        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3758    let snippet: String = body.chars().take(500).collect();
3759    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3760    let (value, cost, is_oauth) = match mode {
3761        EnrichMode::ClaudeCode => call_claude(
3762            binary,
3763            DOMAIN_CLASSIFY_PROMPT,
3764            DOMAIN_CLASSIFY_SCHEMA,
3765            &input_text,
3766            model,
3767            timeout,
3768        )?,
3769        EnrichMode::Codex => call_codex(
3770            binary,
3771            DOMAIN_CLASSIFY_PROMPT,
3772            DOMAIN_CLASSIFY_SCHEMA,
3773            &input_text,
3774            model,
3775            timeout,
3776        )?,
3777        EnrichMode::Opencode => call_opencode(
3778            binary,
3779            DOMAIN_CLASSIFY_PROMPT,
3780            DOMAIN_CLASSIFY_SCHEMA,
3781            &input_text,
3782            model,
3783            timeout,
3784        )?,
3785        EnrichMode::OpenRouter => call_openrouter(
3786            DOMAIN_CLASSIFY_PROMPT,
3787            DOMAIN_CLASSIFY_SCHEMA,
3788            &input_text,
3789            model,
3790            timeout,
3791        )?,
3792    };
3793    let domain = value
3794        .get("domain")
3795        .and_then(|v| v.as_str())
3796        .unwrap_or("uncategorized");
3797    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3798    conn.execute(
3799        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3800        rusqlite::params![metadata, mem_id],
3801    )?;
3802    Ok(EnrichItemResult::Done {
3803        memory_id: Some(mem_id),
3804        entity_id: None,
3805        entities: 0,
3806        rels: 0,
3807        chars_before: None,
3808        chars_after: None,
3809        cost,
3810        is_oauth,
3811    })
3812}
3813
3814/// G27 P2: Audit memory graph quality via LLM.
3815fn call_graph_audit(
3816    conn: &Connection,
3817    _namespace: &str,
3818    item_key: &str,
3819    binary: &Path,
3820    model: Option<&str>,
3821    timeout: u64,
3822    mode: &EnrichMode,
3823) -> Result<EnrichItemResult, AppError> {
3824    let (mem_id, body, desc): (i64, String, String) = conn
3825        .query_row(
3826            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3827            rusqlite::params![item_key],
3828            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3829        )
3830        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3831    let snippet: String = body.chars().take(500).collect();
3832    let ent_count: i64 = conn
3833        .query_row(
3834            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3835            rusqlite::params![mem_id],
3836            |r| r.get(0),
3837        )
3838        .unwrap_or(0);
3839    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3840    let (value, cost, is_oauth) = match mode {
3841        EnrichMode::ClaudeCode => call_claude(
3842            binary,
3843            GRAPH_AUDIT_PROMPT,
3844            GRAPH_AUDIT_SCHEMA,
3845            &input_text,
3846            model,
3847            timeout,
3848        )?,
3849        EnrichMode::Codex => call_codex(
3850            binary,
3851            GRAPH_AUDIT_PROMPT,
3852            GRAPH_AUDIT_SCHEMA,
3853            &input_text,
3854            model,
3855            timeout,
3856        )?,
3857        EnrichMode::Opencode => call_opencode(
3858            binary,
3859            GRAPH_AUDIT_PROMPT,
3860            GRAPH_AUDIT_SCHEMA,
3861            &input_text,
3862            model,
3863            timeout,
3864        )?,
3865        EnrichMode::OpenRouter => call_openrouter(
3866            GRAPH_AUDIT_PROMPT,
3867            GRAPH_AUDIT_SCHEMA,
3868            &input_text,
3869            model,
3870            timeout,
3871        )?,
3872    };
3873    let issues = value
3874        .get("issues")
3875        .and_then(|v| v.as_array())
3876        .map(|a| a.len())
3877        .unwrap_or(0);
3878    Ok(EnrichItemResult::Done {
3879        memory_id: Some(mem_id),
3880        entity_id: None,
3881        entities: 0,
3882        rels: issues,
3883        chars_before: None,
3884        chars_after: None,
3885        cost,
3886        is_oauth,
3887    })
3888}
3889
3890/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3891fn call_deep_research_synth(
3892    conn: &Connection,
3893    namespace: &str,
3894    item_key: &str,
3895    binary: &Path,
3896    model: Option<&str>,
3897    timeout: u64,
3898    mode: &EnrichMode,
3899) -> Result<EnrichItemResult, AppError> {
3900    let (mem_id, body): (i64, String) = conn
3901        .query_row(
3902            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3903            rusqlite::params![item_key],
3904            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3905        )
3906        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3907    let snippet: String = body.chars().take(2000).collect();
3908    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3909    let (value, cost, is_oauth) = match mode {
3910        EnrichMode::ClaudeCode => call_claude(
3911            binary,
3912            DEEP_RESEARCH_SYNTH_PROMPT,
3913            DEEP_RESEARCH_SYNTH_SCHEMA,
3914            &input_text,
3915            model,
3916            timeout,
3917        )?,
3918        EnrichMode::Codex => call_codex(
3919            binary,
3920            DEEP_RESEARCH_SYNTH_PROMPT,
3921            DEEP_RESEARCH_SYNTH_SCHEMA,
3922            &input_text,
3923            model,
3924            timeout,
3925        )?,
3926        EnrichMode::Opencode => call_opencode(
3927            binary,
3928            DEEP_RESEARCH_SYNTH_PROMPT,
3929            DEEP_RESEARCH_SYNTH_SCHEMA,
3930            &input_text,
3931            model,
3932            timeout,
3933        )?,
3934        EnrichMode::OpenRouter => call_openrouter(
3935            DEEP_RESEARCH_SYNTH_PROMPT,
3936            DEEP_RESEARCH_SYNTH_SCHEMA,
3937            &input_text,
3938            model,
3939            timeout,
3940        )?,
3941    };
3942    let mut ent_count = 0usize;
3943    let mut rel_count = 0usize;
3944    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3945        for e in ents {
3946            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3947            let etype_str = e
3948                .get("entity_type")
3949                .and_then(|v| v.as_str())
3950                .unwrap_or("concept");
3951            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3952            if name.len() >= 2 {
3953                let ne = NewEntity {
3954                    name: name.to_string(),
3955                    entity_type: etype,
3956                    description: None,
3957                };
3958                let _ = entities::upsert_entity(conn, namespace, &ne);
3959                ent_count += 1;
3960            }
3961        }
3962    }
3963    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3964        for r in rels {
3965            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3966            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3967            if src.is_empty() || tgt.is_empty() {
3968                continue;
3969            }
3970            let rel = r
3971                .get("relation")
3972                .and_then(|v| v.as_str())
3973                .unwrap_or("related");
3974            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3975            if let (Some(sid), Some(tid)) = (
3976                entities::find_entity_id(conn, namespace, src)?,
3977                entities::find_entity_id(conn, namespace, tgt)?,
3978            ) {
3979                let _ = entities::create_or_fetch_relationship(
3980                    conn, namespace, sid, tid, rel, str_, None,
3981                );
3982                rel_count += 1;
3983            }
3984        }
3985    }
3986    Ok(EnrichItemResult::Done {
3987        memory_id: Some(mem_id),
3988        entity_id: None,
3989        entities: ent_count,
3990        rels: rel_count,
3991        chars_before: None,
3992        chars_after: None,
3993        cost,
3994        is_oauth,
3995    })
3996}
3997
3998/// G27 P2: Extract structured body from unstructured text via LLM.
3999fn call_body_extract(
4000    conn: &Connection,
4001    _namespace: &str,
4002    item_key: &str,
4003    binary: &Path,
4004    model: Option<&str>,
4005    timeout: u64,
4006    mode: &EnrichMode,
4007) -> Result<EnrichItemResult, AppError> {
4008    let (mem_id, body, old_desc): (i64, String, String) = conn
4009        .query_row(
4010            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4011            rusqlite::params![item_key],
4012            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4013        )
4014        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4015    let old_name: String = conn.query_row(
4016        "SELECT name FROM memories WHERE id = ?1",
4017        rusqlite::params![mem_id],
4018        |r| r.get(0),
4019    )?;
4020    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
4021    let (value, cost, is_oauth) = match mode {
4022        EnrichMode::ClaudeCode => call_claude(
4023            binary,
4024            BODY_EXTRACT_PROMPT,
4025            BODY_EXTRACT_SCHEMA,
4026            &input_text,
4027            model,
4028            timeout,
4029        )?,
4030        EnrichMode::Codex => call_codex(
4031            binary,
4032            BODY_EXTRACT_PROMPT,
4033            BODY_EXTRACT_SCHEMA,
4034            &input_text,
4035            model,
4036            timeout,
4037        )?,
4038        EnrichMode::Opencode => call_opencode(
4039            binary,
4040            BODY_EXTRACT_PROMPT,
4041            BODY_EXTRACT_SCHEMA,
4042            &input_text,
4043            model,
4044            timeout,
4045        )?,
4046        EnrichMode::OpenRouter => call_openrouter(
4047            BODY_EXTRACT_PROMPT,
4048            BODY_EXTRACT_SCHEMA,
4049            &input_text,
4050            model,
4051            timeout,
4052        )?,
4053    };
4054    let restructured = value
4055        .get("restructured_body")
4056        .and_then(|v| v.as_str())
4057        .unwrap_or(&body);
4058    let chars_before = body.len();
4059    let chars_after = restructured.len();
4060    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
4061    conn.execute(
4062        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
4063        rusqlite::params![restructured, new_hash, mem_id],
4064    )?;
4065    memories::sync_fts_after_update(
4066        conn,
4067        mem_id,
4068        &old_name,
4069        &old_desc,
4070        &body,
4071        &old_name,
4072        &old_desc,
4073        restructured,
4074    )?;
4075    Ok(EnrichItemResult::Done {
4076        memory_id: Some(mem_id),
4077        entity_id: None,
4078        entities: 0,
4079        rels: 0,
4080        chars_before: Some(chars_before),
4081        chars_after: Some(chars_after),
4082        cost,
4083        is_oauth,
4084    })
4085}
4086
4087/// Scan for pairs of entities that share no direct relationship.
4088#[allow(clippy::type_complexity)]
4089fn scan_isolated_entity_pairs(
4090    conn: &Connection,
4091    namespace: &str,
4092    limit: Option<usize>,
4093) -> Result<Vec<(i64, String, i64, String)>, AppError> {
4094    let limit_val = limit.unwrap_or(50) as i64;
4095    let mut stmt = conn.prepare_cached(
4096        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
4097         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
4098         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
4099           (r.source_id = e1.id AND r.target_id = e2.id) OR \
4100           (r.source_id = e2.id AND r.target_id = e1.id)) \
4101         LIMIT ?2",
4102    )?;
4103    let rows = stmt
4104        .query_map(rusqlite::params![namespace, limit_val], |r| {
4105            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
4106        })?
4107        .collect::<Result<Vec<_>, _>>()?;
4108    Ok(rows)
4109}
4110
4111/// Scan for entities with non-validated types (all entities for type audit).
4112fn scan_entities_for_type_validation(
4113    conn: &Connection,
4114    namespace: &str,
4115    limit: Option<usize>,
4116) -> Result<Vec<(i64, String, String)>, AppError> {
4117    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4118    let sql = format!(
4119        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
4120    );
4121    let mut stmt = conn.prepare(&sql)?;
4122    let rows = stmt
4123        .query_map(rusqlite::params![namespace], |r| {
4124            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4125        })?
4126        .collect::<Result<Vec<_>, _>>()?;
4127    Ok(rows)
4128}
4129
4130/// Scan for memories with generic descriptions (ingested, imported, etc).
4131fn scan_generic_descriptions(
4132    conn: &Connection,
4133    namespace: &str,
4134    limit: Option<usize>,
4135) -> Result<Vec<(i64, String, String)>, AppError> {
4136    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4137    let sql = format!(
4138        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
4139         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
4140         ORDER BY id {limit_clause}"
4141    );
4142    let mut stmt = conn.prepare(&sql)?;
4143    let rows = stmt
4144        .query_map(rusqlite::params![namespace], |r| {
4145            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4146        })?
4147        .collect::<Result<Vec<_>, _>>()?;
4148    Ok(rows)
4149}
4150
4151/// Calls the Codex CLI for a single enrichment item.
4152///
4153/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
4154fn call_codex(
4155    binary: &Path,
4156    prompt: &str,
4157    json_schema: &str,
4158    input_text: &str,
4159    model: Option<&str>,
4160    timeout_secs: u64,
4161) -> Result<(serde_json::Value, f64, bool), AppError> {
4162    use wait_timeout::ChildExt;
4163
4164    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
4165    // schema to a trusted cache path (not /tmp), and reuse the
4166    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
4167    // hardening rationale.
4168    super::codex_spawn::validate_codex_model(model)?;
4169    let schema_file = super::codex_spawn::trusted_schema_path()?;
4170
4171    let args = super::codex_spawn::CodexSpawnArgs {
4172        binary,
4173        prompt,
4174        json_schema,
4175        input_text,
4176        model,
4177        timeout_secs,
4178        schema_path: schema_file.clone(),
4179    };
4180    let mut cmd = super::codex_spawn::build_codex_command(&args)?;
4181
4182    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
4183        AppError::Io(std::io::Error::new(
4184            e.kind(),
4185            format!("failed to spawn codex: {e}"),
4186        ))
4187    })?;
4188
4189    let full_prompt = format!("{prompt}\n\n{input_text}");
4190    let stdin_bytes = full_prompt.into_bytes();
4191    let mut child_stdin = child
4192        .stdin
4193        .take()
4194        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
4195    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
4196        child_stdin.write_all(&stdin_bytes)?;
4197        drop(child_stdin);
4198        Ok(())
4199    });
4200
4201    let start = std::time::Instant::now();
4202    let timeout = std::time::Duration::from_secs(timeout_secs);
4203    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4204    let _ = std::fs::remove_file(&schema_file);
4205
4206    match status {
4207        Some(exit_status) => {
4208            stdin_thread
4209                .join()
4210                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
4211                .map_err(AppError::Io)?;
4212
4213            tracing::debug!(
4214                target: "process",
4215                exit_code = ?exit_status.code(),
4216                elapsed_ms = start.elapsed().as_millis() as u64,
4217                "external process completed"
4218            );
4219
4220            let mut stdout_buf = Vec::new();
4221            if let Some(mut out) = child.stdout.take() {
4222                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4223            }
4224            if !exit_status.success() {
4225                let mut stderr_buf = Vec::new();
4226                if let Some(mut err) = child.stderr.take() {
4227                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4228                }
4229                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4230                tracing::warn!(
4231                    target: "enrich",
4232                    exit_code = ?exit_status.code(),
4233                    stderr = %stderr_str.trim(),
4234                    "codex process failed"
4235                );
4236                return Err(AppError::Validation(format!(
4237                    "codex exited with code {:?}: {}",
4238                    exit_status.code(),
4239                    stderr_str.trim()
4240                )));
4241            }
4242            let stdout_str = String::from_utf8(stdout_buf)
4243                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4244            // G32: use the JSONL parser, NOT serde_json::from_str on the
4245            // entire stdout (codex emits one event per line).
4246            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4247            // Return the raw agent_message text parsed as JSON. Different
4248            // operations (memory-bindings, body-enrich) use different
4249            // output schemas, so we let the caller pick which fields to
4250            // extract. The previous implementation hardcoded
4251            // `{entities, urls}` which broke body-enrich.
4252            let value: serde_json::Value =
4253                serde_json::from_str(&result.last_agent_text).map_err(|e| {
4254                    AppError::Validation(format!(
4255                        "codex agent_message is not valid JSON: {e}; raw={}",
4256                        result.last_agent_text
4257                    ))
4258                })?;
4259            Ok((value, 0.0, false))
4260        }
4261        None => {
4262            let _ = child.kill();
4263            let _ = child.wait();
4264            let _ = stdin_thread.join();
4265            Err(AppError::Validation(format!(
4266                "codex timed out after {timeout_secs} seconds"
4267            )))
4268        }
4269    }
4270}
4271
4272fn call_opencode(
4273    binary: &Path,
4274    prompt: &str,
4275    json_schema: &str,
4276    input_text: &str,
4277    model: Option<&str>,
4278    timeout_secs: u64,
4279) -> Result<(serde_json::Value, f64, bool), AppError> {
4280    use wait_timeout::ChildExt;
4281
4282    let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4283
4284    let augmented_prompt = if json_schema.is_empty() {
4285        prompt.to_string()
4286    } else {
4287        format!(
4288            "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4289             The JSON MUST match this schema:\n{json_schema}"
4290        )
4291    };
4292
4293    let mut cmd = super::opencode_runner::build_opencode_command_sync(
4294        binary,
4295        &resolved_model,
4296        &augmented_prompt,
4297        input_text,
4298    )?;
4299
4300    let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4301        AppError::Io(std::io::Error::new(
4302            e.kind(),
4303            format!("failed to spawn opencode: {e}"),
4304        ))
4305    })?;
4306
4307    let start = std::time::Instant::now();
4308    let timeout = std::time::Duration::from_secs(timeout_secs);
4309    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4310
4311    match status {
4312        Some(exit_status) => {
4313            tracing::debug!(
4314                target: "process",
4315                exit_code = ?exit_status.code(),
4316                elapsed_ms = start.elapsed().as_millis() as u64,
4317                "opencode process completed"
4318            );
4319
4320            let mut stdout_buf = Vec::new();
4321            if let Some(mut out) = child.stdout.take() {
4322                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4323            }
4324            if !exit_status.success() {
4325                let mut stderr_buf = Vec::new();
4326                if let Some(mut err) = child.stderr.take() {
4327                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4328                }
4329                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4330                tracing::warn!(
4331                    target: "enrich",
4332                    exit_code = ?exit_status.code(),
4333                    stderr = %stderr_str.trim(),
4334                    "opencode process failed"
4335                );
4336                return Err(AppError::Validation(format!(
4337                    "opencode exited with code {:?}: {}",
4338                    exit_status.code(),
4339                    stderr_str.trim()
4340                )));
4341            }
4342            let stdout_str = String::from_utf8(stdout_buf)
4343                .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4344            let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4345            let value: serde_json::Value =
4346                super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4347                    AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4348                })?;
4349            Ok((value, cost, false))
4350        }
4351        None => {
4352            let _ = child.kill();
4353            let _ = child.wait();
4354            Err(AppError::Validation(format!(
4355                "opencode timed out after {timeout_secs} seconds"
4356            )))
4357        }
4358    }
4359}
4360
4361// ---------------------------------------------------------------------------
4362// Tests
4363// ---------------------------------------------------------------------------
4364
4365#[cfg(test)]
4366mod tests {
4367    use super::*;
4368    use rusqlite::Connection;
4369    #[cfg(unix)]
4370    use std::os::unix::fs::PermissionsExt;
4371
4372    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
4373    fn open_test_db() -> Connection {
4374        let conn = Connection::open_in_memory().expect("in-memory db");
4375        conn.execute_batch(
4376            "CREATE TABLE memories (
4377                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4378                namespace   TEXT NOT NULL DEFAULT 'global',
4379                name        TEXT NOT NULL,
4380                type        TEXT NOT NULL DEFAULT 'note',
4381                description TEXT NOT NULL DEFAULT '',
4382                body        TEXT NOT NULL DEFAULT '',
4383                body_hash   TEXT NOT NULL DEFAULT '',
4384                session_id  TEXT,
4385                source      TEXT NOT NULL DEFAULT 'agent',
4386                metadata    TEXT NOT NULL DEFAULT '{}',
4387                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4388                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4389                deleted_at  INTEGER,
4390                UNIQUE(namespace, name)
4391            );
4392            CREATE TABLE entities (
4393                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4394                namespace   TEXT NOT NULL DEFAULT 'global',
4395                name        TEXT NOT NULL,
4396                type        TEXT NOT NULL DEFAULT 'concept',
4397                description TEXT,
4398                degree      INTEGER NOT NULL DEFAULT 0,
4399                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4400                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4401                UNIQUE(namespace, name)
4402            );
4403            CREATE TABLE memory_entities (
4404                memory_id  INTEGER NOT NULL,
4405                entity_id  INTEGER NOT NULL,
4406                PRIMARY KEY (memory_id, entity_id)
4407            );
4408            CREATE TABLE relationships (
4409                id         INTEGER PRIMARY KEY AUTOINCREMENT,
4410                namespace  TEXT NOT NULL DEFAULT 'global',
4411                source_id  INTEGER NOT NULL,
4412                target_id  INTEGER NOT NULL,
4413                relation   TEXT NOT NULL,
4414                weight     REAL NOT NULL DEFAULT 0.5,
4415                description TEXT,
4416                UNIQUE(source_id, target_id, relation)
4417            );
4418            CREATE TABLE memory_embeddings (
4419                memory_id   INTEGER PRIMARY KEY,
4420                namespace   TEXT NOT NULL,
4421                embedding   BLOB NOT NULL,
4422                source      TEXT NOT NULL,
4423                model       TEXT NOT NULL DEFAULT '',
4424                dim         INTEGER NOT NULL DEFAULT 384,
4425                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
4426            );",
4427        )
4428        .expect("schema creation must succeed");
4429        conn
4430    }
4431
4432    #[test]
4433    fn scan_unbound_memories_finds_memories_without_bindings() {
4434        let conn = open_test_db();
4435        conn.execute(
4436            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4437            [],
4438        )
4439        .unwrap();
4440
4441        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4442        assert_eq!(results.len(), 1);
4443        assert_eq!(results[0].1, "test-mem");
4444    }
4445
4446    #[test]
4447    fn scan_unbound_memories_excludes_bound_memories() {
4448        let conn = open_test_db();
4449        conn.execute(
4450            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4451            [],
4452        )
4453        .unwrap();
4454        let mem_id: i64 = conn
4455            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4456                r.get(0)
4457            })
4458            .unwrap();
4459        conn.execute(
4460            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4461            [],
4462        )
4463        .unwrap();
4464        let ent_id: i64 = conn
4465            .query_row(
4466                "SELECT id FROM entities WHERE name='some-entity'",
4467                [],
4468                |r| r.get(0),
4469            )
4470            .unwrap();
4471        conn.execute(
4472            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4473            rusqlite::params![mem_id, ent_id],
4474        )
4475        .unwrap();
4476
4477        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4478        assert!(results.is_empty(), "bound memory must not appear in scan");
4479    }
4480
4481    #[test]
4482    fn scan_entities_without_description_finds_null_description() {
4483        let conn = open_test_db();
4484        conn.execute(
4485            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4486            [],
4487        )
4488        .unwrap();
4489
4490        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4491        assert_eq!(results.len(), 1);
4492        assert_eq!(results[0].1, "my-tool");
4493    }
4494
4495    #[test]
4496    fn scan_entities_without_description_excludes_entities_with_description() {
4497        let conn = open_test_db();
4498        conn.execute(
4499            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4500            [],
4501        )
4502        .unwrap();
4503
4504        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4505        assert!(
4506            results.is_empty(),
4507            "entity with description must not appear"
4508        );
4509    }
4510
4511    #[test]
4512    fn scan_short_body_memories_finds_short_bodies() {
4513        let conn = open_test_db();
4514        conn.execute(
4515            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4516            [],
4517        )
4518        .unwrap();
4519
4520        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4521        assert_eq!(results.len(), 1);
4522        assert_eq!(results[0].1, "short-mem");
4523    }
4524
4525    #[test]
4526    fn scan_short_body_memories_excludes_long_bodies() {
4527        let conn = open_test_db();
4528        let long_body = "a".repeat(1000);
4529        conn.execute(
4530            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4531            rusqlite::params![long_body],
4532        )
4533        .unwrap();
4534
4535        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4536        assert!(results.is_empty(), "long memory must not appear in scan");
4537    }
4538
4539    #[test]
4540    fn scan_respects_limit() {
4541        let conn = open_test_db();
4542        for i in 0..5 {
4543            conn.execute(
4544                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4545                [],
4546            )
4547            .unwrap();
4548        }
4549
4550        let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4551        assert_eq!(results.len(), 3, "limit must be respected");
4552    }
4553
4554    #[test]
4555    fn scan_memories_without_embeddings_finds_only_missing_rows() {
4556        let conn = open_test_db();
4557        conn.execute(
4558            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4559            [],
4560        )
4561        .unwrap();
4562        conn.execute(
4563            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4564            [],
4565        )
4566        .unwrap();
4567        let memory_id: i64 = conn
4568            .query_row(
4569                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4570                [],
4571                |r| r.get(0),
4572            )
4573            .unwrap();
4574        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4575        memories::upsert_vec(
4576            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4577        )
4578        .unwrap();
4579
4580        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4581        assert_eq!(results.len(), 1);
4582        assert_eq!(results[0].1, "missing-vec");
4583    }
4584
4585    #[test]
4586    fn scan_memories_without_embeddings_respects_name_filter() {
4587        let conn = open_test_db();
4588        conn.execute(
4589            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4590            [],
4591        )
4592        .unwrap();
4593        conn.execute(
4594            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4595            [],
4596        )
4597        .unwrap();
4598
4599        let results =
4600            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4601                .unwrap();
4602        assert_eq!(results.len(), 1);
4603        assert_eq!(results[0].1, "match-me");
4604    }
4605
4606    #[test]
4607    fn queue_db_schema_creates_correctly() {
4608        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4609        let conn = open_queue_db(&tmp_path).expect("queue db must open");
4610        let count: i64 = conn
4611            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4612            .unwrap();
4613        assert_eq!(count, 0);
4614        let _ = std::fs::remove_file(&tmp_path);
4615    }
4616
4617    #[test]
4618    fn parse_claude_output_valid_bindings() {
4619        let output = r#"[
4620            {"type":"system","subtype":"init"},
4621            {"type":"result","is_error":false,"total_cost_usd":0.01,
4622             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4623        ]"#;
4624        let result = crate::commands::claude_runner::parse_claude_output(output)
4625            .expect("must parse successfully");
4626        assert!(result.value.get("entities").is_some());
4627        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4628        assert!(!result.is_oauth);
4629    }
4630
4631    #[test]
4632    fn parse_claude_output_detects_oauth() {
4633        let output = r#"[
4634            {"type":"system","subtype":"init","apiKeySource":"none"},
4635            {"type":"result","is_error":false,"total_cost_usd":0.0,
4636             "structured_output":{"entities":[],"relationships":[]}}
4637        ]"#;
4638        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4639        assert!(result.is_oauth);
4640    }
4641
4642    #[test]
4643    fn parse_claude_output_rate_limit_returns_error() {
4644        let output = r#"[
4645            {"type":"system","subtype":"init"},
4646            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4647        ]"#;
4648        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4649        assert!(matches!(err, AppError::RateLimited { .. }));
4650    }
4651
4652    #[test]
4653    fn parse_claude_output_auth_error() {
4654        let output = r#"[
4655            {"type":"system","subtype":"init"},
4656            {"type":"result","is_error":true,"error":"authentication failed"}
4657        ]"#;
4658        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4659        assert!(format!("{err}").contains("authentication failed"));
4660    }
4661
4662    #[cfg(unix)]
4663    #[test]
4664    fn call_codex_returns_raw_json_for_body_enrich_schema() {
4665        let tmp = tempfile::tempdir().expect("tempdir");
4666        let binary = tmp.path().join("codex-mock");
4667        std::fs::write(
4668            &binary,
4669            r#"#!/usr/bin/env bash
4670set -euo pipefail
4671cat <<'JSONL'
4672{"type":"thread.started","thread_id":"mock-thread-0"}
4673{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4674{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4675JSONL
4676"#,
4677        )
4678        .expect("mock codex write");
4679        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4680        perms.set_mode(0o755);
4681        std::fs::set_permissions(&binary, perms).expect("chmod");
4682
4683        let (value, cost, is_oauth) =
4684            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4685                .expect("call_codex must accept body-enrich payload");
4686
4687        assert_eq!(value["enriched_body"], "expanded body");
4688        assert_eq!(cost, 0.0);
4689        assert!(!is_oauth);
4690    }
4691
4692    #[test]
4693    fn dry_run_emits_preview_without_calling_llm() {
4694        // This test validates the dry-run NDJSON contract without spawning any process.
4695        // The scan_operation function requires a DB; we build one in-memory but cannot
4696        // call run() directly because it needs AppPaths (disk). Instead we test the
4697        // lower-level helpers that the dry-run path relies on.
4698        let conn = open_test_db();
4699        conn.execute(
4700            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4701            [],
4702        )
4703        .unwrap();
4704
4705        let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4706        assert_eq!(results.len(), 1);
4707        assert_eq!(results[0].1, "dry-mem");
4708        // If scan finds the item and dry_run is set, no LLM would be called.
4709        // The NDJSON emission is tested via integration tests with a fake binary.
4710    }
4711
4712    #[test]
4713    fn persist_entity_description_updates_db() {
4714        let conn = open_test_db();
4715        conn.execute(
4716            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4717            [],
4718        )
4719        .unwrap();
4720        let eid: i64 = conn
4721            .query_row(
4722                "SELECT id FROM entities WHERE name='tokio-runtime'",
4723                [],
4724                |r| r.get(0),
4725            )
4726            .unwrap();
4727
4728        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4729
4730        let desc: String = conn
4731            .query_row(
4732                "SELECT description FROM entities WHERE id=?1",
4733                rusqlite::params![eid],
4734                |r| r.get(0),
4735            )
4736            .unwrap();
4737        assert_eq!(desc, "Async runtime for Rust applications");
4738    }
4739
4740    #[test]
4741    fn bindings_schema_is_valid_json() {
4742        let _: serde_json::Value =
4743            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4744    }
4745
4746    #[test]
4747    fn entity_description_schema_is_valid_json() {
4748        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4749            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4750    }
4751
4752    #[test]
4753    fn body_enrich_schema_is_valid_json() {
4754        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4755            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4756    }
4757}