Skip to main content

sqlite_graphrag/commands/
enrich.rs

1//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
2//!
3//! Enriches the knowledge graph by running LLM-powered analysis over memories
4//! and entities that are missing key structural data. Operations are:
5//!
6//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
7//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
8//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
9//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
10//!
11//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
12//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
13// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
14//!
15//! # DRY opportunity
16//!
17//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
18//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
19//! here verbatim. A future refactoring could extract them into a shared
20//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
21//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
22//! this stream's boundary — flagged here for the Integration stream to evaluate.
23
24use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39// ---------------------------------------------------------------------------
40// Constants
41// ---------------------------------------------------------------------------
42
43const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48// ---------------------------------------------------------------------------
49// JSON schema used for memory-bindings and body-enrich extraction
50// ---------------------------------------------------------------------------
51
52const BINDINGS_SCHEMA: &str = r#"{
53  "type": "object",
54  "properties": {
55    "entities": {
56      "type": "array",
57      "items": {
58        "type": "object",
59        "properties": {
60          "name": { "type": "string" },
61          "entity_type": {
62            "type": "string",
63            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64          }
65        },
66        "required": ["name", "entity_type"],
67        "additionalProperties": false
68      }
69    },
70    "relationships": {
71      "type": "array",
72      "items": {
73        "type": "object",
74        "properties": {
75          "source": { "type": "string" },
76          "target": { "type": "string" },
77          "relation": {
78            "type": "string",
79            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80          },
81          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82        },
83        "required": ["source","target","relation","strength"],
84        "additionalProperties": false
85      }
86    }
87  },
88  "required": ["entities","relationships"],
89  "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93  "type": "object",
94  "properties": {
95    "description": { "type": "string" }
96  },
97  "required": ["description"],
98  "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102  "type": "object",
103  "properties": {
104    "enriched_body": { "type": "string" }
105  },
106  "required": ["enriched_body"],
107  "additionalProperties": false
108}"#;
109
110// G27 P1: weight-calibrate
111const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120  "type": "object",
121  "properties": {
122    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123    "reasoning": { "type": "string" }
124  },
125  "required": ["calibrated_weight", "reasoning"],
126  "additionalProperties": false
127}"#;
128
129// G27 P1: relation-reclassify
130const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146  "type": "object",
147  "properties": {
148    "relation": { "type": "string" },
149    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150    "reasoning": { "type": "string" }
151  },
152  "required": ["relation", "strength", "reasoning"],
153  "additionalProperties": false
154}"#;
155
156// G27 P2: entity-connect — suggest relationships between isolated entities
157const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163  "type": "object",
164  "properties": {
165    "relation": { "type": "string" },
166    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167    "reasoning": { "type": "string" }
168  },
169  "required": ["relation", "strength", "reasoning"],
170  "additionalProperties": false
171}"#;
172
173// G27 P2: entity-type-validate — verify entity type assignments
174const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180  "type": "object",
181  "properties": {
182    "validated_type": { "type": "string" },
183    "was_correct": { "type": "boolean" },
184    "reasoning": { "type": "string" }
185  },
186  "required": ["validated_type", "was_correct", "reasoning"],
187  "additionalProperties": false
188}"#;
189
190// G27 P2: description-enrich — improve generic memory descriptions
191const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197  "type": "object",
198  "properties": {
199    "description": { "type": "string" },
200    "reasoning": { "type": "string" }
201  },
202  "required": ["description", "reasoning"],
203  "additionalProperties": false
204}"#;
205
206// G27 P2: domain-classify — classify memory into domain category
207const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211  "type": "object",
212  "properties": {
213    "domain": { "type": "string" },
214    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215    "reasoning": { "type": "string" }
216  },
217  "required": ["domain", "confidence", "reasoning"],
218  "additionalProperties": false
219}"#;
220
221// G27 P2: graph-audit — audit graph for quality issues
222const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227  "type": "object",
228  "properties": {
229    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231    "reasoning": { "type": "string" }
232  },
233  "required": ["quality_score", "issues", "reasoning"],
234  "additionalProperties": false
235}"#;
236
237// G27 P2: deep-research-synth — synthesize research findings into graph
238const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244  "type": "object",
245  "properties": {
246    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248    "summary": { "type": "string" }
249  },
250  "required": ["entities", "relationships", "summary"],
251  "additionalProperties": false
252}"#;
253
254// G27 P2: body-extract — extract structured content from unstructured text
255const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260  "type": "object",
261  "properties": {
262    "restructured_body": { "type": "string" },
263    "changes_summary": { "type": "string" }
264  },
265  "required": ["restructured_body", "changes_summary"],
266  "additionalProperties": false
267}"#;
268
269// ---------------------------------------------------------------------------
270// Prompts
271// ---------------------------------------------------------------------------
272
273const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288// ---------------------------------------------------------------------------
289// CLI args
290// ---------------------------------------------------------------------------
291
292/// Operation to perform in the `enrich` command.
293#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296    /// Add missing entity/relationship bindings to memories (fully implemented).
297    MemoryBindings,
298    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
299    EntityDescriptions,
300    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
301    BodyEnrich,
302    /// Rebuild missing memory embeddings without rewriting the memory body.
303    ReEmbed,
304    /// Calibrate relationship weights using LLM analysis (scan only).
305    WeightCalibrate,
306    /// Reclassify relationship types using LLM judgment (scan only).
307    RelationReclassify,
308    /// Connect isolated entities by suggesting new relationships (scan only).
309    EntityConnect,
310    /// Validate entity type assignments using LLM judgment (scan only).
311    EntityTypeValidate,
312    /// Enrich memory descriptions that are generic/auto-generated (scan only).
313    DescriptionEnrich,
314    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
315    CrossDomainBridges,
316    /// Classify memories into domain categories (scan only).
317    DomainClassify,
318    /// Audit the graph for quality issues (scan only).
319    GraphAudit,
320    /// Synthesize deep-research findings into graph memories (scan only).
321    DeepResearchSynth,
322    /// Extract structured body from unstructured text (scan only).
323    BodyExtract,
324}
325
326/// LLM provider for enrichment.
327#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329    /// Use locally installed Claude Code CLI (OAuth-first).
330    ClaudeCode,
331    /// Use locally installed OpenAI Codex CLI.
332    Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        match self {
338            EnrichMode::ClaudeCode => write!(f, "claude-code"),
339            EnrichMode::Codex => write!(f, "codex"),
340        }
341    }
342}
343
344/// Arguments for the `enrich` subcommand.
345#[derive(clap::Args)]
346#[command(
347    about = "Enrich graph memories and entities using an LLM provider",
348    after_long_help = "EXAMPLES:\n  \
349    # Add missing entity bindings to all unbound memories\n  \
350    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
351    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
352    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
353    # Expand short memory bodies (GAP-18)\n  \
354    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
355    # Rebuild only missing memory embeddings without rewriting bodies\n  \
356    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
357    # Resume an interrupted body-enrich run\n  \
358    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
359    # Retry only failed items from a previous run\n  \
360    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361    EXIT CODES:\n  \
362    0  success\n  \
363    1  validation error (bad args, binary not found)\n  \
364    14 I/O error"
365)]
366pub struct EnrichArgs {
367    /// Enrichment operation to run.
368    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369    pub operation: EnrichOperation,
370
371    /// LLM provider to use. Default: claude-code (OAuth-first).
372    #[arg(long, value_enum, default_value = "claude-code")]
373    pub mode: EnrichMode,
374
375    /// Maximum number of items to process in this run. Omit for all.
376    #[arg(long, value_name = "N")]
377    pub limit: Option<usize>,
378
379    /// Preview items without calling the LLM (zero tokens consumed).
380    #[arg(long)]
381    pub dry_run: bool,
382
383    /// Namespace to operate on. Default: global.
384    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385    pub namespace: Option<String>,
386
387    // -- Provider flags (Claude) --
388    /// Path to the Claude Code binary. Default: auto-detect from PATH.
389    #[arg(long, value_name = "PATH")]
390    pub claude_binary: Option<PathBuf>,
391
392    /// Claude model to use (e.g. claude-sonnet-4-6).
393    #[arg(long, value_name = "MODEL")]
394    pub claude_model: Option<String>,
395
396    /// Timeout per item in seconds when using Claude Code. Default: 300.
397    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398    pub claude_timeout: u64,
399
400    // -- Provider flags (Codex) --
401    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
402    #[arg(long, value_name = "PATH")]
403    pub codex_binary: Option<PathBuf>,
404
405    /// Codex model to use (e.g. o4-mini).
406    #[arg(long, value_name = "MODEL")]
407    pub codex_model: Option<String>,
408
409    /// Timeout per item in seconds when using Codex. Default: 300.
410    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411    pub codex_timeout: u64,
412
413    // -- Cost controls --
414    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
415    #[arg(long, value_name = "USD")]
416    pub max_cost_usd: Option<f64>,
417
418    // -- Queue controls --
419    /// Resume a previously interrupted run (skip already-done items).
420    #[arg(long)]
421    pub resume: bool,
422
423    /// Retry only items that failed in a previous run.
424    #[arg(long)]
425    pub retry_failed: bool,
426
427    // -- body-enrich specific flags (GAP-18) --
428    /// Minimum output character count for body-enrich. Default: 500.
429    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430    pub min_output_chars: usize,
431
432    /// Maximum output character count for body-enrich. Default: 2000.
433    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434    pub max_output_chars: usize,
435
436    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
437    #[arg(long, default_value_t = true)]
438    pub preserve_check: bool,
439
440    /// Path to a custom prompt template file for body-enrich.
441    #[arg(long, value_name = "PATH")]
442    pub prompt_template: Option<PathBuf>,
443
444    /// Number of parallel LLM workers (default 1 = serial).
445    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
446    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
447    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448    pub llm_parallelism: u32,
449
450    // -- Output / infra --
451    /// Emit NDJSON output. Always true; flag accepted for compatibility.
452    #[arg(long)]
453    pub json: bool,
454
455    /// Database path override.
456    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457    pub db: Option<String>,
458
459    /// G30: poll for the job singleton every second for up to N seconds
460    /// when another invocation holds the lock. Default: 0 (fail fast).
461    #[arg(long, value_name = "SECONDS")]
462    pub wait_job_singleton: Option<u64>,
463
464    /// G30: force acquisition of the singleton lock by removing a stale
465    /// lock file from a previously crashed invocation. Use only when you
466    /// are certain no other `enrich`/`ingest` is running.
467    #[arg(long, default_value_t = false)]
468    pub force_job_singleton: bool,
469
470    /// G37: select a specific subset of memory names to enrich instead of
471    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
472    /// Empty when omitted (processes all candidates).
473    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474    pub names: Vec<String>,
475
476    /// G37: read the subset of memory names from a file (one per line).
477    /// Lines starting with `#` and empty lines are ignored. Combined with
478    /// `--names` (union) when both are set.
479    #[arg(long, value_name = "PATH")]
480    pub names_file: Option<PathBuf>,
481
482    /// G35: probe the LLM provider with a 1-turn ping before processing
483    /// the batch. Aborts with a clear error if the rate-limit window is
484    /// closed (avoids burning N turns only to fail on item 1).
485    #[arg(long, default_value_t = false)]
486    pub preflight_check: bool,
487
488    /// G35: if a preflight probe or in-flight call hits the Claude rate
489    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
490    /// of failing the batch. Ignored when `--mode` is already `codex`.
491    #[arg(long, value_enum)]
492    pub fallback_mode: Option<EnrichMode>,
493
494    /// G35: number of seconds before the OAuth rate-limit reset at which
495    /// the preflight probe should refuse to start. Default 300 (5 min).
496    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497    pub rate_limit_buffer: u64,
498
499    /// G28-D: refuse to start when the 1-minute load average exceeds
500    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
501    /// Set to false to skip the check on contended CI runners.
502    #[arg(long, default_value_t = true)]
503    pub max_load_check: bool,
504
505    /// G28-D: when the system is saturated, abort the job after this
506    /// many consecutive HardFailure outcomes. Default 5.
507    #[arg(long, value_name = "N", default_value_t = 5)]
508    pub circuit_breaker_threshold: u32,
509
510    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
511    /// original body and the LLM-rewritten body for the rewrite to be
512    /// accepted. Scores below the threshold are rejected and emitted as
513    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
514    /// gap specification). Ignored when `--operation` is not
515    /// `body-enrich`.
516    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517    pub preserve_threshold: f64,
518
519    /// G33 Passo 3: when set, validate `--codex-model` against the
520    /// ChatGPT Pro OAuth accepted-model list and abort with a
521    /// suggestion when the value is unknown. Default true (fail fast
522    /// to avoid burning OAuth turns). Set to false to opt out.
523    #[arg(long, default_value_t = true)]
524    pub codex_model_validate: bool,
525
526    /// G33 Passo 3: when set together with an invalid `--codex-model`,
527    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
528    /// instead of aborting. The substitution is recorded in the NDJSON
529    /// stream as `provider_substituted: true` for traceability.
530    #[arg(long, value_name = "MODEL")]
531    pub codex_model_fallback: Option<String>,
532}
533
534// ---------------------------------------------------------------------------
535// Internal types — raw LLM output structs
536// ---------------------------------------------------------------------------
537
538// ---------------------------------------------------------------------------
539// NDJSON event types emitted to stdout
540// ---------------------------------------------------------------------------
541
542#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544    phase: &'a str,
545    #[serde(skip_serializing_if = "Option::is_none")]
546    binary_path: Option<&'a str>,
547    #[serde(skip_serializing_if = "Option::is_none")]
548    version: Option<&'a str>,
549    #[serde(skip_serializing_if = "Option::is_none")]
550    items_total: Option<usize>,
551    #[serde(skip_serializing_if = "Option::is_none")]
552    items_pending: Option<usize>,
553    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
554    #[serde(skip_serializing_if = "Option::is_none")]
555    llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560    /// Item identifier (memory name or entity name).
561    item: &'a str,
562    status: &'a str,
563    #[serde(skip_serializing_if = "Option::is_none")]
564    memory_id: Option<i64>,
565    #[serde(skip_serializing_if = "Option::is_none")]
566    entity_id: Option<i64>,
567    #[serde(skip_serializing_if = "Option::is_none")]
568    entities: Option<usize>,
569    #[serde(skip_serializing_if = "Option::is_none")]
570    rels: Option<usize>,
571    #[serde(skip_serializing_if = "Option::is_none")]
572    chars_before: Option<usize>,
573    #[serde(skip_serializing_if = "Option::is_none")]
574    chars_after: Option<usize>,
575    #[serde(skip_serializing_if = "Option::is_none")]
576    cost_usd: Option<f64>,
577    #[serde(skip_serializing_if = "Option::is_none")]
578    elapsed_ms: Option<u64>,
579    #[serde(skip_serializing_if = "Option::is_none")]
580    error: Option<String>,
581    index: usize,
582    total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587    summary: bool,
588    operation: String,
589    items_total: usize,
590    completed: usize,
591    failed: usize,
592    skipped: usize,
593    cost_usd: f64,
594    elapsed_ms: u64,
595}
596
597use crate::output::emit_json_line as emit_json;
598
599// ---------------------------------------------------------------------------
600// Queue DB
601// ---------------------------------------------------------------------------
602
603/// Opens or creates the enrichment queue database.
604///
605/// The queue schema mirrors `ingest_claude` for resume/retry parity.
606/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
607///
608/// # DRY note
609///
610/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
611/// Both should be unified in a shared `llm_runner.rs` module by the
612/// Integration stream.
613fn open_queue_db(path: &str) -> Result<Connection, AppError> {
614    let conn = Connection::open(path)?;
615    conn.pragma_update(None, "journal_mode", "wal")?;
616    conn.execute_batch(
617        "CREATE TABLE IF NOT EXISTS queue (
618            id          INTEGER PRIMARY KEY AUTOINCREMENT,
619            item_key    TEXT NOT NULL UNIQUE,
620            item_type   TEXT NOT NULL DEFAULT 'memory',
621            status      TEXT NOT NULL DEFAULT 'pending',
622            memory_id   INTEGER,
623            entity_id   INTEGER,
624            entities    INTEGER DEFAULT 0,
625            rels        INTEGER DEFAULT 0,
626            error       TEXT,
627            cost_usd    REAL DEFAULT 0.0,
628            attempt     INTEGER DEFAULT 0,
629            elapsed_ms  INTEGER,
630            created_at  TEXT DEFAULT (datetime('now')),
631            done_at     TEXT
632        );
633        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
634    )?;
635    Ok(conn)
636}
637
638// ---------------------------------------------------------------------------
639// LLM invocation — Claude Code
640// ---------------------------------------------------------------------------
641
642/// Calls `claude -p` via the shared `claude_runner` module (G02).
643///
644/// Returns `(output_value, cost_usd, is_oauth)`.
645fn call_claude(
646    binary: &Path,
647    prompt: &str,
648    json_schema: &str,
649    input_text: &str,
650    model: Option<&str>,
651    timeout_secs: u64,
652) -> Result<(serde_json::Value, f64, bool), AppError> {
653    let result = crate::commands::claude_runner::run_claude(
654        binary,
655        prompt,
656        json_schema,
657        input_text,
658        model,
659        timeout_secs,
660        7,
661    )?;
662    Ok((result.value, result.cost_usd, result.is_oauth))
663}
664
665// ---------------------------------------------------------------------------
666// Preflight probe (G35) — single-turn ping to verify the LLM provider
667// ---------------------------------------------------------------------------
668
669/// Result of a single preflight ping (G35).
670enum PreflightOutcome {
671    /// The provider accepted the ping without rate-limit or other errors.
672    Healthy,
673    /// The provider rejected the ping due to OAuth rate limit. The
674    /// `suggestion` field is a human hint that callers can embed in the
675    /// user-facing error.
676    RateLimited {
677        reason: String,
678        suggestion: &'static str,
679    },
680    /// Any other provider error (binary missing, auth failure, etc.).
681    Error(AppError),
682}
683
684/// Probes the configured LLM provider with a 1-turn ping.
685///
686/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
687/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
688///
689/// The probe intentionally avoids spawning any MCP server children (G28-A)
690/// to keep its own process footprint at the minimum.
691fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
692    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
693
694    match args.mode {
695        EnrichMode::ClaudeCode => {
696            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
697                Ok(b) => b,
698                Err(e) => return PreflightOutcome::Error(e),
699            };
700            let mut cmd = std::process::Command::new(&bin);
701            cmd.env_clear();
702            for var in &["PATH", "HOME", "USER"] {
703                if let Ok(val) = std::env::var(var) {
704                    cmd.env(var, val);
705                }
706            }
707            cmd.arg("-p")
708                .arg("ping")
709                .arg("--max-turns")
710                .arg("1")
711                .arg("--strict-mcp-config")
712                .arg("--mcp-config")
713                .arg("{}")
714                .arg("--dangerously-skip-permissions")
715                .arg("--settings")
716                .arg("{\"hooks\":{}}")
717                .arg("--output-format")
718                .arg("json")
719                .stdin(std::process::Stdio::null())
720                .stdout(std::process::Stdio::piped())
721                .stderr(std::process::Stdio::piped());
722
723            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
724                Ok(c) => c,
725                Err(e) => {
726                    return PreflightOutcome::Error(AppError::Io(e));
727                }
728            };
729            let output = match wait_with_timeout(child, timeout) {
730                Ok(out) => out,
731                Err(e) => return PreflightOutcome::Error(e),
732            };
733            if !output.status.success() {
734                let stderr = String::from_utf8_lossy(&output.stderr);
735                if stderr.contains("hit your session limit")
736                    || stderr.contains("rate_limit")
737                    || stderr.contains("429")
738                {
739                    return PreflightOutcome::RateLimited {
740                        reason: stderr.trim().to_string(),
741                        suggestion:
742                            "wait for the OAuth window to reset or use --fallback-mode codex",
743                    };
744                }
745                return PreflightOutcome::Error(AppError::Validation(format!(
746                    "preflight probe failed: {stderr}",
747                    stderr = stderr.trim()
748                )));
749            }
750            PreflightOutcome::Healthy
751        }
752        EnrichMode::Codex => {
753            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
754                Ok(b) => b,
755                Err(e) => return PreflightOutcome::Error(e),
756            };
757            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
758                .map_err(PreflightOutcome::Error)
759                .ok();
760            let schema = "{}";
761            let schema_path = match super::codex_spawn::trusted_schema_path() {
762                Ok(p) => p,
763                Err(e) => return PreflightOutcome::Error(e),
764            };
765            let spawn_args = super::codex_spawn::CodexSpawnArgs {
766                binary: &bin,
767                prompt: "ping",
768                json_schema: schema,
769                input_text: "",
770                model: args.codex_model.as_deref(),
771                timeout_secs: args.rate_limit_buffer.max(60),
772                schema_path: schema_path.clone(),
773            };
774            let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
775            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
776                Ok(c) => c,
777                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
778            };
779            let output = match wait_with_timeout(child, timeout) {
780                Ok(out) => out,
781                Err(e) => return PreflightOutcome::Error(e),
782            };
783            let _ = std::fs::remove_file(&schema_path);
784            if !output.status.success() {
785                let stderr = String::from_utf8_lossy(&output.stderr);
786                if stderr.contains("rate_limit")
787                    || stderr.contains("429")
788                    || stderr.contains("Too Many Requests")
789                {
790                    return PreflightOutcome::RateLimited {
791                        reason: stderr.trim().to_string(),
792                        suggestion: "wait for the rate-limit window to reset",
793                    };
794                }
795                return PreflightOutcome::Error(AppError::Validation(format!(
796                    "preflight probe failed: {stderr}",
797                    stderr = stderr.trim()
798                )));
799            }
800            PreflightOutcome::Healthy
801        }
802    }
803}
804
805/// Cross-platform wait with timeout (no extra crate dependency).
806fn wait_with_timeout(
807    mut child: std::process::Child,
808    timeout: std::time::Duration,
809) -> Result<std::process::Output, AppError> {
810    use wait_timeout::ChildExt;
811    let start = std::time::Instant::now();
812    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
813    if status.is_none() {
814        let _ = child.kill();
815        let _ = child.wait();
816        return Err(AppError::Validation(format!(
817            "preflight probe timed out after {}s",
818            start.elapsed().as_secs()
819        )));
820    }
821    let mut stdout = Vec::new();
822    if let Some(mut out) = child.stdout.take() {
823        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
824    }
825    let mut stderr = Vec::new();
826    if let Some(mut err) = child.stderr.take() {
827        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
828    }
829    let exit = status.unwrap();
830    Ok(std::process::Output {
831        status: exit,
832        stdout,
833        stderr,
834    })
835}
836
837// ---------------------------------------------------------------------------
838// SCAN helpers — SQL queries that find items needing enrichment
839// ---------------------------------------------------------------------------
840
841/// Returns memories without any `memory_entities` binding.
842///
843/// These are the targets for `memory-bindings` enrichment. When `name_filter`
844/// is non-empty, restricts the scan to the given names (G37); unknown names
845/// are silently skipped (the caller can detect them by comparing
846/// requested vs. returned).
847fn scan_unbound_memories(
848    conn: &Connection,
849    namespace: &str,
850    limit: Option<usize>,
851    name_filter: &[String],
852) -> Result<Vec<(i64, String, String)>, AppError> {
853    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
854
855    if name_filter.is_empty() {
856        let sql = format!(
857            "SELECT m.id, m.name, m.body
858             FROM memories m
859             WHERE m.namespace = ?1
860               AND m.deleted_at IS NULL
861               AND NOT EXISTS (
862                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
863               )
864             ORDER BY m.id
865             {limit_clause}"
866        );
867        let mut stmt = conn.prepare(&sql)?;
868        let rows = stmt
869            .query_map(rusqlite::params![namespace], |r| {
870                Ok((
871                    r.get::<_, i64>(0)?,
872                    r.get::<_, String>(1)?,
873                    r.get::<_, String>(2)?,
874                ))
875            })?
876            .collect::<Result<Vec<_>, _>>()?;
877        Ok(rows)
878    } else {
879        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
880        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
881            .map(|i| format!("?{i}"))
882            .collect();
883        let in_clause = placeholders.join(", ");
884        let sql = format!(
885            "SELECT m.id, m.name, m.body
886             FROM memories m
887             WHERE m.namespace = ?1
888               AND m.deleted_at IS NULL
889               AND m.name IN ({in_clause})
890               AND NOT EXISTS (
891                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
892               )
893             ORDER BY m.id
894             {limit_clause}"
895        );
896        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
897        params_vec.push(&namespace);
898        for n in name_filter {
899            params_vec.push(n);
900        }
901        let mut stmt = conn.prepare(&sql)?;
902        let rows = stmt
903            .query_map(
904                rusqlite::params_from_iter(params_vec.iter().copied()),
905                |r| {
906                    Ok((
907                        r.get::<_, i64>(0)?,
908                        r.get::<_, String>(1)?,
909                        r.get::<_, String>(2)?,
910                    ))
911                },
912            )?
913            .collect::<Result<Vec<_>, _>>()?;
914        Ok(rows)
915    }
916}
917
918/// Reads a list of memory names from a UTF-8 text file (G37).
919///
920/// Empty lines and lines beginning with `#` are skipped. Returns a
921/// de-duplicated, order-preserving list of trimmed names.
922fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
923    let content = std::fs::read_to_string(path).map_err(|e| {
924        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
925    })?;
926    let mut seen = std::collections::HashSet::new();
927    let mut out = Vec::new();
928    for line in content.lines() {
929        let trimmed = line.trim();
930        if trimmed.is_empty() || trimmed.starts_with('#') {
931            continue;
932        }
933        if seen.insert(trimmed.to_string()) {
934            out.push(trimmed.to_string());
935        }
936    }
937    Ok(out)
938}
939
940/// Resolves the union of `--names` and `--names-file` (G37).
941fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
942    let mut combined: Vec<String> = args.names.clone();
943    if let Some(p) = &args.names_file {
944        let from_file = read_names_file(p)?;
945        for n in from_file {
946            if !combined.contains(&n) {
947                combined.push(n);
948            }
949        }
950    }
951    Ok(combined)
952}
953
954/// Returns entities with NULL or empty description.
955///
956/// These are the targets for `entity-descriptions` enrichment.
957fn scan_entities_without_description(
958    conn: &Connection,
959    namespace: &str,
960    limit: Option<usize>,
961) -> Result<Vec<(i64, String, String)>, AppError> {
962    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
963    let sql = format!(
964        "SELECT id, name, type
965         FROM entities
966         WHERE namespace = ?1
967           AND (description IS NULL OR description = '')
968         ORDER BY id
969         {limit_clause}"
970    );
971    let mut stmt = conn.prepare(&sql)?;
972    let rows = stmt
973        .query_map(rusqlite::params![namespace], |r| {
974            Ok((
975                r.get::<_, i64>(0)?,
976                r.get::<_, String>(1)?,
977                r.get::<_, String>(2)?,
978            ))
979        })?
980        .collect::<Result<Vec<_>, _>>()?;
981    Ok(rows)
982}
983
984/// Returns memories whose body length is below the configured minimum.
985///
986/// These are the targets for `body-enrich` (GAP-18).
987fn scan_short_body_memories(
988    conn: &Connection,
989    namespace: &str,
990    min_chars: usize,
991    limit: Option<usize>,
992) -> Result<Vec<(i64, String, String)>, AppError> {
993    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
994    let sql = format!(
995        "SELECT m.id, m.name, m.body
996         FROM memories m
997         WHERE m.namespace = ?1
998           AND m.deleted_at IS NULL
999           AND LENGTH(COALESCE(m.body,'')) < ?2
1000         ORDER BY m.id
1001         {limit_clause}"
1002    );
1003    let mut stmt = conn.prepare(&sql)?;
1004    let rows = stmt
1005        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1006            Ok((
1007                r.get::<_, i64>(0)?,
1008                r.get::<_, String>(1)?,
1009                r.get::<_, String>(2)?,
1010            ))
1011        })?
1012        .collect::<Result<Vec<_>, _>>()?;
1013    Ok(rows)
1014}
1015
1016/// Returns live memories that still have no row in `memory_embeddings`.
1017///
1018/// These are the targets for `re-embed`.
1019fn scan_memories_without_embeddings(
1020    conn: &Connection,
1021    namespace: &str,
1022    limit: Option<usize>,
1023    name_filter: &[String],
1024) -> Result<Vec<(i64, String, String)>, AppError> {
1025    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1026
1027    if name_filter.is_empty() {
1028        let sql = format!(
1029            "SELECT m.id, m.name, COALESCE(m.body,'')
1030             FROM memories m
1031             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1032             WHERE m.namespace = ?1
1033               AND m.deleted_at IS NULL
1034               AND me.memory_id IS NULL
1035             ORDER BY m.id
1036             {limit_clause}"
1037        );
1038        let mut stmt = conn.prepare(&sql)?;
1039        let rows = stmt
1040            .query_map(rusqlite::params![namespace], |r| {
1041                Ok((
1042                    r.get::<_, i64>(0)?,
1043                    r.get::<_, String>(1)?,
1044                    r.get::<_, String>(2)?,
1045                ))
1046            })?
1047            .collect::<Result<Vec<_>, _>>()?;
1048        Ok(rows)
1049    } else {
1050        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1051            .map(|i| format!("?{i}"))
1052            .collect();
1053        let in_clause = placeholders.join(", ");
1054        let sql = format!(
1055            "SELECT m.id, m.name, COALESCE(m.body,'')
1056             FROM memories m
1057             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1058             WHERE m.namespace = ?1
1059               AND m.deleted_at IS NULL
1060               AND m.name IN ({in_clause})
1061               AND me.memory_id IS NULL
1062             ORDER BY m.id
1063             {limit_clause}"
1064        );
1065        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1066        params_vec.push(&namespace);
1067        for n in name_filter {
1068            params_vec.push(n);
1069        }
1070        let mut stmt = conn.prepare(&sql)?;
1071        let rows = stmt
1072            .query_map(
1073                rusqlite::params_from_iter(params_vec.iter().copied()),
1074                |r| {
1075                    Ok((
1076                        r.get::<_, i64>(0)?,
1077                        r.get::<_, String>(1)?,
1078                        r.get::<_, String>(2)?,
1079                    ))
1080                },
1081            )?
1082            .collect::<Result<Vec<_>, _>>()?;
1083        Ok(rows)
1084    }
1085}
1086
1087/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1088#[allow(clippy::type_complexity)]
1089fn scan_weight_candidates(
1090    conn: &Connection,
1091    namespace: &str,
1092    limit: Option<usize>,
1093) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1094    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1095    let sql = format!(
1096        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1097         FROM relationships r \
1098         JOIN entities e1 ON e1.id = r.source_id \
1099         JOIN entities e2 ON e2.id = r.target_id \
1100         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1101         ORDER BY r.weight DESC {limit_clause}"
1102    );
1103    let mut stmt = conn.prepare(&sql)?;
1104    let rows = stmt
1105        .query_map(rusqlite::params![namespace], |r| {
1106            Ok((
1107                r.get::<_, i64>(0)?,
1108                r.get::<_, String>(1)?,
1109                r.get::<_, String>(2)?,
1110                r.get::<_, String>(3)?,
1111                r.get::<_, f64>(4)?,
1112            ))
1113        })?
1114        .collect::<Result<Vec<_>, _>>()?;
1115    Ok(rows)
1116}
1117
1118/// G27: Returns relationships with generic relation types (applies_to).
1119fn scan_generic_relations(
1120    conn: &Connection,
1121    namespace: &str,
1122    limit: Option<usize>,
1123) -> Result<Vec<(i64, String, String, String)>, AppError> {
1124    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1125    let sql = format!(
1126        "SELECT r.id, e1.name, e2.name, r.relation \
1127         FROM relationships r \
1128         JOIN entities e1 ON e1.id = r.source_id \
1129         JOIN entities e2 ON e2.id = r.target_id \
1130         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1131         ORDER BY r.id {limit_clause}"
1132    );
1133    let mut stmt = conn.prepare(&sql)?;
1134    let rows = stmt
1135        .query_map(rusqlite::params![namespace], |r| {
1136            Ok((
1137                r.get::<_, i64>(0)?,
1138                r.get::<_, String>(1)?,
1139                r.get::<_, String>(2)?,
1140                r.get::<_, String>(3)?,
1141            ))
1142        })?
1143        .collect::<Result<Vec<_>, _>>()?;
1144    Ok(rows)
1145}
1146
1147// ---------------------------------------------------------------------------
1148// PERSIST helpers for fully-implemented operations
1149// ---------------------------------------------------------------------------
1150
1151/// Persists entity bindings extracted by the LLM for a memory.
1152///
1153/// Creates entities via `upsert_entity`, links them to the memory via
1154/// `link_memory_entity`, and upserts relationships found between entities.
1155fn persist_memory_bindings(
1156    conn: &Connection,
1157    namespace: &str,
1158    memory_id: i64,
1159    entities_json: &serde_json::Value,
1160    rels_json: &serde_json::Value,
1161) -> Result<(usize, usize), AppError> {
1162    #[derive(Deserialize)]
1163    struct EntityItem {
1164        name: String,
1165        entity_type: String,
1166    }
1167    #[derive(Deserialize)]
1168    struct RelItem {
1169        source: String,
1170        target: String,
1171        relation: String,
1172        strength: f64,
1173    }
1174
1175    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1176        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1177
1178    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1179        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1180
1181    let mut ent_count = 0usize;
1182    let mut rel_count = 0usize;
1183
1184    for item in &extracted_entities {
1185        let entity_type = match item.entity_type.parse::<EntityType>() {
1186            Ok(et) => et,
1187            Err(_) => {
1188                tracing::warn!(
1189                    target: "enrich",
1190                    entity = %item.name,
1191                    entity_type = %item.entity_type,
1192                    "entity type not recognized, skipping"
1193                );
1194                continue;
1195            }
1196        };
1197        match entities::upsert_entity(
1198            conn,
1199            namespace,
1200            &NewEntity {
1201                name: item.name.clone(),
1202                entity_type,
1203                description: None,
1204            },
1205        ) {
1206            Ok(eid) => {
1207                let _ = entities::link_memory_entity(conn, memory_id, eid);
1208                ent_count += 1;
1209            }
1210            Err(e) => {
1211                tracing::warn!(
1212                    target: "enrich",
1213                    entity = %item.name,
1214                    error = %e,
1215                    "entity upsert skipped"
1216                );
1217            }
1218        }
1219    }
1220
1221    for rel in &extracted_rels {
1222        let normalized = crate::parsers::normalize_relation(&rel.relation);
1223        crate::parsers::warn_if_non_canonical(&normalized);
1224
1225        // Normalize entity names before lookup: upsert_entity normalizes on write,
1226        // so the lookup must use the same normalized form to find the row.
1227        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1228        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1229        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1230        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1231        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1232            let new_rel = NewRelationship {
1233                source: rel.source.clone(),
1234                target: rel.target.clone(),
1235                relation: normalized,
1236                strength: rel.strength,
1237                description: None,
1238            };
1239            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1240                rel_count += 1;
1241            }
1242        }
1243    }
1244
1245    Ok((ent_count, rel_count))
1246}
1247
1248/// Updates an entity's description directly in the `entities` table.
1249fn persist_entity_description(
1250    conn: &Connection,
1251    entity_id: i64,
1252    description: &str,
1253) -> Result<(), AppError> {
1254    conn.execute(
1255        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1256        rusqlite::params![description, entity_id],
1257    )?;
1258    Ok(())
1259}
1260
1261fn reembed_memory_vector(
1262    conn: &Connection,
1263    namespace: &str,
1264    memory_id: i64,
1265    memory_name: &str,
1266    memory_type: &str,
1267    body: &str,
1268    paths: &crate::paths::AppPaths,
1269) -> Result<(), AppError> {
1270    let snippet: String = body.chars().take(200).collect();
1271    let embedding = crate::embedder::embed_passage_local(&paths.models, body)?;
1272    memories::upsert_vec(
1273        conn,
1274        memory_id,
1275        namespace,
1276        memory_type,
1277        &embedding,
1278        memory_name,
1279        &snippet,
1280    )?;
1281    Ok(())
1282}
1283
1284/// Persists an enriched memory body (body-enrich, GAP-18).
1285///
1286/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1287/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1288fn persist_enriched_body(
1289    conn: &Connection,
1290    namespace: &str,
1291    memory_id: i64,
1292    memory_name: &str,
1293    new_body: &str,
1294    paths: &crate::paths::AppPaths,
1295) -> Result<(), AppError> {
1296    // Read current values for FTS sync
1297    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1298        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1299        rusqlite::params![memory_id],
1300        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1301    )?;
1302
1303    let memory_type: String = conn.query_row(
1304        "SELECT type FROM memories WHERE id=?1",
1305        rusqlite::params![memory_id],
1306        |r| r.get(0),
1307    )?;
1308
1309    let description: String = conn.query_row(
1310        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1311        rusqlite::params![memory_id],
1312        |r| r.get(0),
1313    )?;
1314
1315    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1316
1317    let new_memory = memories::NewMemory {
1318        namespace: namespace.to_string(),
1319        name: memory_name.to_string(),
1320        memory_type: memory_type.clone(),
1321        description: description.clone(),
1322        body: new_body.to_string(),
1323        body_hash,
1324        session_id: None,
1325        source: "agent".to_string(),
1326        metadata: serde_json::json!({
1327            "operation": "body-enrich",
1328            "orig_chars": old_body.chars().count(),
1329            "new_chars": new_body.chars().count(),
1330        }),
1331    };
1332
1333    // G29 audit: insert a new immutable version BEFORE the update so the
1334    // enriched body is reachable through `history --name <X>` and
1335    // `restore --version N` can roll back to the pre-enrich state.
1336    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1337    let version_metadata = serde_json::json!({
1338        "operation": "body-enrich",
1339        "orig_chars": old_body.chars().count(),
1340        "new_chars": new_body.chars().count(),
1341    })
1342    .to_string();
1343    crate::storage::versions::insert_version(
1344        conn,
1345        memory_id,
1346        next_version,
1347        memory_name,
1348        &memory_type,
1349        &description,
1350        new_body,
1351        &version_metadata,
1352        Some("enrich"),
1353        "edit",
1354    )?;
1355
1356    memories::update(conn, memory_id, &new_memory, None)?;
1357    memories::sync_fts_after_update(
1358        conn,
1359        memory_id,
1360        &old_name,
1361        &old_desc,
1362        &old_body,
1363        &new_memory.name,
1364        &new_memory.description,
1365        &new_memory.body,
1366    )?;
1367
1368    // Re-embed for recall accuracy
1369    if let Err(e) = reembed_memory_vector(
1370        conn,
1371        namespace,
1372        memory_id,
1373        memory_name,
1374        &memory_type,
1375        new_body,
1376        paths,
1377    ) {
1378        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1379    }
1380
1381    Ok(())
1382}
1383
1384// ---------------------------------------------------------------------------
1385// Main entry point
1386// ---------------------------------------------------------------------------
1387
1388// ---------------------------------------------------------------------------
1389// G20: mode-conditional flag validation
1390// ---------------------------------------------------------------------------
1391
1392/// True when a scalar value matches its declared default. Used to
1393/// distinguish "operator passed an explicit override" from "clap filled
1394/// the default" for flags with default_value_t.
1395fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1396    value == default
1397}
1398
1399/// G20: validate that flags for one LLM provider were not passed when
1400/// the operator selected a different provider. Flags silently discarded
1401/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1402/// DB work, so the operator gets an actionable error instead of a
1403/// surprise at runtime.
1404///
1405/// Detection rules:
1406/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1407/// - For scalar fields with default_value_t: value != default means explicit
1408/// - For boolean fields: true means explicit (default is false)
1409///
1410/// Mode-specific matrices:
1411/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1412/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1413fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1414    const DEFAULT_TIMEOUT: u64 = 300;
1415
1416    let mut conflicts: Vec<String> = Vec::new();
1417
1418    match args.mode {
1419        EnrichMode::ClaudeCode => {
1420            if args.codex_binary.is_some() {
1421                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1422            }
1423            if args.codex_model.is_some() {
1424                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1425            }
1426            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1427                conflicts.push(format!(
1428                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1429                    args.codex_timeout
1430                ));
1431            }
1432        }
1433        EnrichMode::Codex => {
1434            if args.claude_binary.is_some() {
1435                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1436            }
1437            if args.claude_model.is_some() {
1438                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1439            }
1440            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1441                conflicts.push(format!(
1442                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1443                    args.claude_timeout
1444                ));
1445            }
1446            if args.max_cost_usd.is_some() {
1447                conflicts.push(
1448                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1449                        .to_string(),
1450                );
1451            }
1452        }
1453    }
1454
1455    if !conflicts.is_empty() {
1456        return Err(AppError::Validation(format!(
1457            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1458            args.mode,
1459            conflicts.join("\n  - ")
1460        )));
1461    }
1462
1463    Ok(())
1464}
1465
1466// ---------------------------------------------------------------------------
1467
1468/// Main entry point for the `enrich` command.
1469pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
1470    // G20: mode-conditional flag validation BEFORE any DB access.
1471    // Surfaces flags that the wrong mode would silently discard.
1472    validate_mode_conditional_flags_enrich(args)?;
1473    let started = Instant::now();
1474
1475    let paths = AppPaths::resolve(args.db.as_deref())?;
1476    ensure_db_ready(&paths)?;
1477    let conn = open_rw(&paths.db)?;
1478    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1479
1480    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1481    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1482    // on the same DB cannot co-exist, but concurrent enrich on different
1483    // databases works as expected. The force flag (--force) breaks a
1484    // stale lock from a previously crashed invocation.
1485    let wait_secs = args.wait_job_singleton;
1486    let force_flag = args.force_job_singleton;
1487    let _singleton = crate::lock::acquire_job_singleton(
1488        crate::lock::JobType::Enrich,
1489        &namespace,
1490        &paths.db,
1491        wait_secs,
1492        force_flag,
1493    )?;
1494
1495    // Validate provider binary upfront only for LLM-backed operations.
1496    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1497        None
1498    } else {
1499        Some(match args.mode {
1500            EnrichMode::ClaudeCode => {
1501                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1502                let version = super::claude_runner::validate_claude_version(&bin)?;
1503                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1504                emit_json(&PhaseEvent {
1505                    phase: "validate",
1506                    binary_path: bin.to_str(),
1507                    version: Some(&version),
1508                    items_total: None,
1509                    items_pending: None,
1510                    llm_parallelism: None,
1511                });
1512                bin
1513            }
1514            EnrichMode::Codex => {
1515                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1516                emit_json(&PhaseEvent {
1517                    phase: "validate",
1518                    binary_path: bin.to_str(),
1519                    version: None,
1520                    items_total: None,
1521                    items_pending: None,
1522                    llm_parallelism: None,
1523                });
1524                bin
1525            }
1526        })
1527    };
1528
1529    // G28-D: refuse to start when the system is saturated. This check
1530    // is BEFORE preflight so we never spend an OAuth turn on a host
1531    // that is already at the limit.
1532    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1533        let load = crate::system_load::load_average_one();
1534        let n = crate::system_load::ncpus();
1535        return Err(AppError::Validation(format!(
1536            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1537             pass --no-max-load-check to override (not recommended)"
1538        )));
1539    }
1540
1541    // G35: preflight probe — issue a single ping turn to verify the
1542    // provider is healthy before scanning N candidates. If the probe
1543    // fails with a rate-limit error, optionally fall back to a
1544    // different mode (typically codex) instead of failing the entire
1545    // batch. The probe itself consumes 1 OAuth turn, so it stays
1546    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1547    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1548    {
1549        let preflight_result = run_preflight_probe(args);
1550        match preflight_result {
1551            PreflightOutcome::Healthy => {
1552                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1553            }
1554            PreflightOutcome::RateLimited { reason, suggestion } => {
1555                if let Some(fallback) = args.fallback_mode.clone() {
1556                    if fallback != args.mode {
1557                        // G35 (v1.0.69): the mid-batch mode switch is
1558                        // intentionally NOT applied because it would
1559                        // desynchronise the per-item rate-limit wait
1560                        // state (rate-limited items in the worker are
1561                        // timed against the original provider). Instead
1562                        // we abort cleanly so the operator can re-invoke
1563                        // with `--mode {fallback:?}`. This guarantees no
1564                        // OAuth window is wasted and no partial state
1565                        // is left in the queue.
1566                        return Err(AppError::Validation(format!(
1567                            "preflight detected rate limit on {mode:?}: {reason}; \
1568                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1569                            mode = args.mode
1570                        )));
1571                    }
1572                    return Err(AppError::Validation(format!(
1573                        "preflight detected rate limit on {mode:?}: {reason}; \
1574                         --fallback-mode matches --mode, no recovery possible",
1575                        mode = args.mode
1576                    )));
1577                }
1578                return Err(AppError::Validation(format!(
1579                    "preflight detected rate limit on {mode:?}: {reason}; \
1580                     {suggestion}; pass --fallback-mode codex to recover",
1581                    mode = args.mode
1582                )));
1583            }
1584            PreflightOutcome::Error(e) => {
1585                return Err(e);
1586            }
1587        }
1588    }
1589
1590    // SCAN phase
1591    let scan_result = scan_operation(&conn, &namespace, args)?;
1592    let total = scan_result.len();
1593
1594    emit_json(&PhaseEvent {
1595        phase: "scan",
1596        binary_path: None,
1597        version: None,
1598        items_total: Some(total),
1599        items_pending: Some(total),
1600        llm_parallelism: Some(args.llm_parallelism),
1601    });
1602
1603    // Dry-run: emit preview events and summary without calling LLM
1604    if args.dry_run {
1605        for (idx, key) in scan_result.iter().enumerate() {
1606            emit_json(&ItemEvent {
1607                item: key,
1608                status: "preview",
1609                memory_id: None,
1610                entity_id: None,
1611                entities: None,
1612                rels: None,
1613                chars_before: None,
1614                chars_after: None,
1615                cost_usd: None,
1616                elapsed_ms: None,
1617                error: None,
1618                index: idx,
1619                total,
1620            });
1621        }
1622        emit_json(&EnrichSummary {
1623            summary: true,
1624            operation: format!("{:?}", args.operation),
1625            items_total: total,
1626            completed: 0,
1627            failed: 0,
1628            skipped: 0,
1629            cost_usd: 0.0,
1630            elapsed_ms: started.elapsed().as_millis() as u64,
1631        });
1632        return Ok(());
1633    }
1634
1635    // All operations in this enum have an execution path.
1636
1637    // Queue setup for resume/retry
1638    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1639
1640    if args.resume {
1641        let reset = queue_conn
1642            .execute(
1643                "UPDATE queue SET status='pending' WHERE status='processing'",
1644                [],
1645            )
1646            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1647        if reset > 0 {
1648            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1649        }
1650    }
1651
1652    if args.retry_failed {
1653        let count = queue_conn
1654            .execute(
1655                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1656                [],
1657            )
1658            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1659        tracing::info!(target: "enrich", count, "retrying failed items");
1660    }
1661
1662    if !args.resume && !args.retry_failed {
1663        queue_conn
1664            .execute("DELETE FROM queue", [])
1665            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1666    }
1667
1668    // Populate queue
1669    for (idx, key) in scan_result.iter().enumerate() {
1670        let item_type = match args.operation {
1671            EnrichOperation::EntityDescriptions => "entity",
1672            _ => "memory",
1673        };
1674        if let Err(e) = queue_conn.execute(
1675            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1676            rusqlite::params![key, item_type],
1677        ) {
1678            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1679        }
1680        let _ = idx; // suppress unused warning
1681    }
1682
1683    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1684    // Clamp enforces the range even if the caller bypasses clap validation.
1685    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1686    if parallelism > 1 {
1687        tracing::info!(
1688            target: "enrich",
1689            llm_parallelism = parallelism,
1690            "parallel LLM processing with bounded thread pool"
1691        );
1692    }
1693    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1694    // ceiling. The threshold and message depend on the LLM mode because
1695    // Claude Code spawns MCP children (G28-A) while Codex does not.
1696    if parallelism > 4 {
1697        match args.mode {
1698            EnrichMode::ClaudeCode => {
1699                tracing::warn!(
1700                    target: "enrich",
1701                    llm_parallelism = parallelism,
1702                    recommended_max = 4,
1703                    mode = "claude-code",
1704                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1705                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1706                     to cut MCP children (G28-A)"
1707                );
1708            }
1709            EnrichMode::Codex if parallelism > 16 => {
1710                tracing::warn!(
1711                    target: "enrich",
1712                    llm_parallelism = parallelism,
1713                    recommended_max = 16,
1714                    mode = "codex",
1715                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1716                     consider --llm-parallelism 8 for safer concurrency"
1717                );
1718            }
1719            EnrichMode::Codex => {
1720                // No warning: codex does not spawn MCP children and was
1721                // validated at parallelism 8 in production (1161 items,
1722                // 0 failures) per the 2026-06-04 session audit.
1723            }
1724        }
1725    }
1726
1727    let mut completed = 0usize;
1728    let mut failed = 0usize;
1729    let mut skipped = 0usize;
1730    let mut cost_total = 0.0f64;
1731    let mut oauth_detected = false;
1732    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1733    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1734    let enrich_started = std::time::Instant::now();
1735
1736    let provider_timeout = match args.mode {
1737        EnrichMode::ClaudeCode => args.claude_timeout,
1738        EnrichMode::Codex => args.codex_timeout,
1739    };
1740
1741    let provider_model: Option<&str> = match args.mode {
1742        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1743        EnrichMode::Codex => args.codex_model.as_deref(),
1744    };
1745
1746    // G19: when parallelism > 1, spawn bounded worker threads.
1747    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1748    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1749    if parallelism > 1 {
1750        let stdout_mu = parking_lot::Mutex::new(());
1751        let budget = args.max_cost_usd;
1752        let operation = args.operation.clone();
1753        let mode = args.mode.clone();
1754        let min_oc = args.min_output_chars;
1755        let max_oc = args.max_output_chars;
1756        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1757
1758        struct WorkerResult {
1759            completed: usize,
1760            failed: usize,
1761            skipped: usize,
1762            cost: f64,
1763            oauth: bool,
1764        }
1765
1766        let results: Vec<WorkerResult> = std::thread::scope(|s| {
1767            let handles: Vec<_> = (0..parallelism)
1768                .map(|worker_id| {
1769                    let stdout_mu = &stdout_mu;
1770                    let paths = &paths;
1771                    let namespace = &namespace;
1772                    let provider_binary = provider_binary.as_deref();
1773                    let operation = &operation;
1774                    let mode = &mode;
1775                    let prompt_tpl = prompt_tpl.as_deref();
1776                    s.spawn(move || {
1777                        let w_conn = match open_rw(&paths.db) {
1778                            Ok(c) => c,
1779                            Err(e) => {
1780                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1781                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1782                            }
1783                        };
1784                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1785                            Ok(c) => c,
1786                            Err(e) => {
1787                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1788                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1789                            }
1790                        };
1791                        let mut w_completed = 0usize;
1792                        let mut w_failed = 0usize;
1793                        let mut w_skipped = 0usize;
1794                        let mut w_cost = 0.0f64;
1795                        let mut w_oauth = false;
1796                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1797                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1798                        // G28-D: per-worker circuit breaker that aborts the
1799                        // loop after `circuit_breaker_threshold` consecutive
1800                        // HardFailure outcomes (transient/rate-limited errors
1801                        // do NOT count, so a recovering provider is not
1802                        // penalised).
1803                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1804                            args.circuit_breaker_threshold.max(1),
1805                            std::time::Duration::from_secs(60),
1806                        );
1807
1808                        loop {
1809                            if crate::shutdown_requested() {
1810                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1811                                break;
1812                            }
1813                            if let Some(b) = budget {
1814                                if !w_oauth && w_cost >= b {
1815                                    break;
1816                                }
1817                            }
1818                            let pending: Option<(i64, String, String)> = w_queue
1819                                .query_row(
1820                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1821                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1822                                     RETURNING id, item_key, item_type",
1823                                    [],
1824                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1825                                )
1826                                .ok();
1827                            let (queue_id, item_key, _item_type) = match pending {
1828                                Some(p) => p,
1829                                None => break,
1830                            };
1831                            let item_started = Instant::now();
1832                            let current_index = w_completed + w_failed + w_skipped;
1833
1834                            let call_result = match operation {
1835                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1836                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1837                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths),
1838                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths),
1839                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1840                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1841                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1842                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1843                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1844                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1845                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1846                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1847                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1848                            };
1849
1850                            match call_result {
1851                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1852                                    if is_oauth { w_oauth = true; }
1853                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1854                                    let _ = w_queue.execute(
1855                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1856                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1857                                    );
1858                                    w_completed += 1;
1859                                    if !is_oauth { w_cost += cost; }
1860                                    // G28-D: count success; resets breaker.
1861                                    let _ = w_breaker
1862                                        .record(crate::retry::AttemptOutcome::Success);
1863                                    let _guard = stdout_mu.lock();
1864                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1865                                }
1866                                Ok(EnrichItemResult::Skipped { reason }) => {
1867                                    w_skipped += 1;
1868                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1869                                    let _guard = stdout_mu.lock();
1870                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1871                                }
1872                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1873                                    // G29 Passo 4: worker mirror of the
1874                                    // serial path. Counted as a soft
1875                                    // skip so the queue surface shows
1876                                    // a quality issue rather than a
1877                                    // transport failure.
1878                                    w_skipped += 1;
1879                                    let reason = format!(
1880                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1881                                    );
1882                                    let _ = w_queue.execute(
1883                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1884                                        rusqlite::params![reason, queue_id],
1885                                    );
1886                                    let _guard = stdout_mu.lock();
1887                                    emit_json(&ItemEvent {
1888                                        item: &item_key,
1889                                        status: "preservation_failed",
1890                                        memory_id: None,
1891                                        entity_id: None,
1892                                        entities: None,
1893                                        rels: None,
1894                                        chars_before: Some(chars_before),
1895                                        chars_after: Some(chars_after),
1896                                        cost_usd: None,
1897                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1898                                        error: Some(reason),
1899                                        index: current_index,
1900                                        total,
1901                                    });
1902                                }
1903                                Err(e) => {
1904                                    let err_str = format!("{e}");
1905                                    if matches!(e, AppError::RateLimited { .. }) {
1906                                        if crate::retry::is_kill_switch_active() {
1907                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1908                                        } else if std::time::Instant::now() >= w_deadline {
1909                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1910                                        } else {
1911                                            let half = w_backoff / 2;
1912                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1913                                            let actual_wait = half + jitter;
1914                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1915                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1916                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1917                                            w_backoff = (w_backoff * 2).min(900);
1918                                            continue;
1919                                        }
1920                                    }
1921                                    w_failed += 1;
1922                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1923                                    let _guard = stdout_mu.lock();
1924                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1925                                    // G28-D: count hard failure against breaker.
1926                                    let breaker_opened = w_breaker
1927                                        .record(crate::retry::AttemptOutcome::HardFailure);
1928                                    if breaker_opened {
1929                                        tracing::error!(target: "enrich",
1930                                            consecutive_failures = w_breaker.consecutive_failures(),
1931                                            "circuit breaker opened — aborting worker"
1932                                        );
1933                                        break;
1934                                    }
1935                                }
1936                            }
1937                        }
1938                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1939                    })
1940                })
1941                .collect();
1942            handles
1943                .into_iter()
1944                .map(|h| {
1945                    h.join().unwrap_or(WorkerResult {
1946                        completed: 0,
1947                        failed: 0,
1948                        skipped: 0,
1949                        cost: 0.0,
1950                        oauth: false,
1951                    })
1952                })
1953                .collect()
1954        });
1955
1956        for r in &results {
1957            completed += r.completed;
1958            failed += r.failed;
1959            skipped += r.skipped;
1960            cost_total += r.cost;
1961            if r.oauth && !oauth_detected {
1962                oauth_detected = true;
1963            }
1964        }
1965    } else {
1966        // Serial path (parallelism == 1) — original loop
1967        loop {
1968            if crate::shutdown_requested() {
1969                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1970                break;
1971            }
1972
1973            // Budget check
1974            if let Some(budget) = args.max_cost_usd {
1975                if !oauth_detected && cost_total >= budget {
1976                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1977                    break;
1978                }
1979            }
1980
1981            // Dequeue next pending item
1982            let pending: Option<(i64, String, String)> = queue_conn
1983                .query_row(
1984                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1985                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1986                 RETURNING id, item_key, item_type",
1987                    [],
1988                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1989                )
1990                .ok();
1991
1992            let (queue_id, item_key, item_type) = match pending {
1993                Some(p) => p,
1994                None => break,
1995            };
1996
1997            let item_started = Instant::now();
1998            let current_index = completed + failed + skipped;
1999
2000            let call_result = match args.operation {
2001                EnrichOperation::MemoryBindings => call_memory_bindings(
2002                    &conn,
2003                    &namespace,
2004                    &item_key,
2005                    provider_binary
2006                        .as_deref()
2007                        .expect("provider binary required"),
2008                    provider_model,
2009                    provider_timeout,
2010                    &args.mode,
2011                ),
2012                EnrichOperation::EntityDescriptions => call_entity_description(
2013                    &conn,
2014                    &namespace,
2015                    &item_key,
2016                    provider_binary
2017                        .as_deref()
2018                        .expect("provider binary required"),
2019                    provider_model,
2020                    provider_timeout,
2021                    &args.mode,
2022                ),
2023                EnrichOperation::BodyEnrich => call_body_enrich(
2024                    &conn,
2025                    &namespace,
2026                    &item_key,
2027                    provider_binary
2028                        .as_deref()
2029                        .expect("provider binary required"),
2030                    provider_model,
2031                    provider_timeout,
2032                    &args.mode,
2033                    args.min_output_chars,
2034                    args.max_output_chars,
2035                    args.prompt_template.as_deref(),
2036                    args.preserve_threshold,
2037                    &paths,
2038                ),
2039                EnrichOperation::ReEmbed => call_reembed(&conn, &namespace, &item_key, &paths),
2040                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2041                    &conn,
2042                    &namespace,
2043                    &item_key,
2044                    provider_binary
2045                        .as_deref()
2046                        .expect("provider binary required"),
2047                    provider_model,
2048                    provider_timeout,
2049                    &args.mode,
2050                ),
2051                EnrichOperation::RelationReclassify => call_relation_reclassify(
2052                    &conn,
2053                    &namespace,
2054                    &item_key,
2055                    provider_binary
2056                        .as_deref()
2057                        .expect("provider binary required"),
2058                    provider_model,
2059                    provider_timeout,
2060                    &args.mode,
2061                ),
2062                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2063                    call_entity_connect(
2064                        &conn,
2065                        &namespace,
2066                        &item_key,
2067                        provider_binary
2068                            .as_deref()
2069                            .expect("provider binary required"),
2070                        provider_model,
2071                        provider_timeout,
2072                        &args.mode,
2073                    )
2074                }
2075                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2076                    &conn,
2077                    &namespace,
2078                    &item_key,
2079                    provider_binary
2080                        .as_deref()
2081                        .expect("provider binary required"),
2082                    provider_model,
2083                    provider_timeout,
2084                    &args.mode,
2085                ),
2086                EnrichOperation::DescriptionEnrich => call_description_enrich(
2087                    &conn,
2088                    &namespace,
2089                    &item_key,
2090                    provider_binary
2091                        .as_deref()
2092                        .expect("provider binary required"),
2093                    provider_model,
2094                    provider_timeout,
2095                    &args.mode,
2096                ),
2097                EnrichOperation::DomainClassify => call_domain_classify(
2098                    &conn,
2099                    &namespace,
2100                    &item_key,
2101                    provider_binary
2102                        .as_deref()
2103                        .expect("provider binary required"),
2104                    provider_model,
2105                    provider_timeout,
2106                    &args.mode,
2107                ),
2108                EnrichOperation::GraphAudit => call_graph_audit(
2109                    &conn,
2110                    &namespace,
2111                    &item_key,
2112                    provider_binary
2113                        .as_deref()
2114                        .expect("provider binary required"),
2115                    provider_model,
2116                    provider_timeout,
2117                    &args.mode,
2118                ),
2119                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2120                    &conn,
2121                    &namespace,
2122                    &item_key,
2123                    provider_binary
2124                        .as_deref()
2125                        .expect("provider binary required"),
2126                    provider_model,
2127                    provider_timeout,
2128                    &args.mode,
2129                ),
2130                EnrichOperation::BodyExtract => call_body_extract(
2131                    &conn,
2132                    &namespace,
2133                    &item_key,
2134                    provider_binary
2135                        .as_deref()
2136                        .expect("provider binary required"),
2137                    provider_model,
2138                    provider_timeout,
2139                    &args.mode,
2140                ),
2141            };
2142
2143            match call_result {
2144                Ok(EnrichItemResult::Done {
2145                    memory_id,
2146                    entity_id,
2147                    entities,
2148                    rels,
2149                    chars_before,
2150                    chars_after,
2151                    cost,
2152                    is_oauth,
2153                }) => {
2154                    if is_oauth && !oauth_detected {
2155                        oauth_detected = true;
2156                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2157                    }
2158                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2159
2160                    // Persist depends on the operation
2161                    let persist_err: Option<String> = match args.operation {
2162                        EnrichOperation::MemoryBindings => {
2163                            // Bindings already persisted inside call_memory_bindings
2164                            None
2165                        }
2166                        EnrichOperation::EntityDescriptions => {
2167                            // Description already persisted inside call_entity_description
2168                            None
2169                        }
2170                        EnrichOperation::BodyEnrich => {
2171                            // Body already persisted inside call_body_enrich
2172                            None
2173                        }
2174                        _ => {
2175                            // All G27 operations persist inside their call_* function
2176                            None
2177                        }
2178                    };
2179
2180                    if let Err(e) = queue_conn.execute(
2181                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2182                    rusqlite::params![
2183                        memory_id,
2184                        entity_id,
2185                        entities as i64,
2186                        rels as i64,
2187                        cost,
2188                        item_started.elapsed().as_millis() as i64,
2189                        queue_id
2190                    ],
2191                ) {
2192                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2193                    }
2194
2195                    if persist_err.is_none() {
2196                        completed += 1;
2197                        if !is_oauth {
2198                            cost_total += cost;
2199                        }
2200                        emit_json(&ItemEvent {
2201                            item: &item_key,
2202                            status: "done",
2203                            memory_id,
2204                            entity_id,
2205                            entities: Some(entities),
2206                            rels: Some(rels),
2207                            chars_before,
2208                            chars_after,
2209                            cost_usd: if is_oauth { None } else { Some(cost) },
2210                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2211                            error: None,
2212                            index: current_index,
2213                            total,
2214                        });
2215                    } else {
2216                        failed += 1;
2217                        emit_json(&ItemEvent {
2218                            item: &item_key,
2219                            status: "failed",
2220                            memory_id: None,
2221                            entity_id: None,
2222                            entities: None,
2223                            rels: None,
2224                            chars_before: None,
2225                            chars_after: None,
2226                            cost_usd: None,
2227                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2228                            error: persist_err,
2229                            index: current_index,
2230                            total,
2231                        });
2232                    }
2233                }
2234                Ok(EnrichItemResult::Skipped { reason }) => {
2235                    skipped += 1;
2236                    if let Err(e) = queue_conn.execute(
2237                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2238                    rusqlite::params![reason, queue_id],
2239                ) {
2240                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2241                    }
2242                    emit_json(&ItemEvent {
2243                        item: &item_key,
2244                        status: "skipped",
2245                        memory_id: None,
2246                        entity_id: None,
2247                        entities: None,
2248                        rels: None,
2249                        chars_before: None,
2250                        chars_after: None,
2251                        cost_usd: None,
2252                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2253                        error: None,
2254                        index: current_index,
2255                        total,
2256                    });
2257                }
2258                Ok(EnrichItemResult::PreservationFailed {
2259                    score,
2260                    threshold,
2261                    chars_before,
2262                    chars_after,
2263                }) => {
2264                    // G29 Passo 4: the LLM rewrite diverged too far from
2265                    // the original body. Count as a soft failure (not
2266                    // `failed`) so the queue surfaces it as a quality
2267                    // issue, not a transport error. The reason is
2268                    // structured so the operator can audit why a body
2269                    // was rejected.
2270                    skipped += 1;
2271                    let reason = format!(
2272                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2273                    );
2274                    if let Err(qe) = queue_conn.execute(
2275                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2276                        rusqlite::params![reason, queue_id],
2277                    ) {
2278                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2279                    }
2280                    emit_json(&ItemEvent {
2281                        item: &item_key,
2282                        status: "preservation_failed",
2283                        memory_id: None,
2284                        entity_id: None,
2285                        entities: None,
2286                        rels: None,
2287                        chars_before: Some(chars_before),
2288                        chars_after: Some(chars_after),
2289                        cost_usd: None,
2290                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2291                        error: Some(reason),
2292                        index: current_index,
2293                        total,
2294                    });
2295                }
2296                Err(e) => {
2297                    let err_str = format!("{e}");
2298                    if matches!(e, AppError::RateLimited { .. }) {
2299                        if crate::retry::is_kill_switch_active() {
2300                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2301                        } else if std::time::Instant::now() >= rate_limit_deadline {
2302                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2303                        } else {
2304                            let half = backoff_secs / 2;
2305                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2306                            let actual_wait = half + jitter;
2307                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2308                            if let Err(qe) = queue_conn.execute(
2309                                "UPDATE queue SET status='pending' WHERE id=?1",
2310                                rusqlite::params![queue_id],
2311                            ) {
2312                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2313                            }
2314                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2315                            backoff_secs = (backoff_secs * 2).min(900);
2316                            continue;
2317                        }
2318                    }
2319
2320                    failed += 1;
2321                    if let Err(qe) = queue_conn.execute(
2322                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2323                    rusqlite::params![err_str, queue_id],
2324                ) {
2325                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2326                    }
2327                    emit_json(&ItemEvent {
2328                        item: &item_key,
2329                        status: "failed",
2330                        memory_id: None,
2331                        entity_id: None,
2332                        entities: None,
2333                        rels: None,
2334                        chars_before: None,
2335                        chars_after: None,
2336                        cost_usd: None,
2337                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2338                        error: Some(err_str),
2339                        index: current_index,
2340                        total,
2341                    });
2342                }
2343            }
2344
2345            let _ = item_type; // used via queue schema only
2346        }
2347    } // end else (serial path)
2348
2349    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2350    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2351
2352    emit_json(&EnrichSummary {
2353        summary: true,
2354        operation: format!("{:?}", args.operation),
2355        items_total: total,
2356        completed,
2357        failed,
2358        skipped,
2359        cost_usd: cost_total,
2360        elapsed_ms: started.elapsed().as_millis() as u64,
2361    });
2362
2363    if failed == 0 {
2364        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2365    }
2366
2367    Ok(())
2368}
2369
2370// ---------------------------------------------------------------------------
2371// Internal result type for a single item call
2372// ---------------------------------------------------------------------------
2373
2374enum EnrichItemResult {
2375    Done {
2376        memory_id: Option<i64>,
2377        entity_id: Option<i64>,
2378        entities: usize,
2379        rels: usize,
2380        chars_before: Option<usize>,
2381        chars_after: Option<usize>,
2382        cost: f64,
2383        is_oauth: bool,
2384    },
2385    Skipped {
2386        reason: String,
2387    },
2388    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2389    /// body beyond the configured `--preserve-threshold` and was rejected
2390    /// before persistence. The trigram-Jaccard score and threshold are
2391    /// emitted in the NDJSON stream for operator audit.
2392    PreservationFailed {
2393        score: f64,
2394        threshold: f64,
2395        chars_before: usize,
2396        chars_after: usize,
2397    },
2398}
2399
2400// ---------------------------------------------------------------------------
2401// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2402// ---------------------------------------------------------------------------
2403
2404fn call_memory_bindings(
2405    conn: &Connection,
2406    namespace: &str,
2407    memory_name: &str,
2408    binary: &Path,
2409    model: Option<&str>,
2410    timeout: u64,
2411    mode: &EnrichMode,
2412) -> Result<EnrichItemResult, AppError> {
2413    // Look up the memory
2414    let (memory_id, body): (i64, String) = conn.query_row(
2415        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2416        rusqlite::params![namespace, memory_name],
2417        |r| Ok((r.get(0)?, r.get(1)?)),
2418    ).map_err(|e| match e {
2419        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2420        other => AppError::Database(other),
2421    })?;
2422
2423    if body.trim().is_empty() {
2424        return Ok(EnrichItemResult::Skipped {
2425            reason: "body is empty".to_string(),
2426        });
2427    }
2428
2429    let (value, cost, is_oauth) = match mode {
2430        EnrichMode::ClaudeCode => call_claude(
2431            binary,
2432            BINDINGS_PROMPT,
2433            BINDINGS_SCHEMA,
2434            &body,
2435            model,
2436            timeout,
2437        )?,
2438        EnrichMode::Codex => call_codex(
2439            binary,
2440            BINDINGS_PROMPT,
2441            BINDINGS_SCHEMA,
2442            &body,
2443            model,
2444            timeout,
2445        )?,
2446    };
2447
2448    let empty_arr = serde_json::Value::Array(vec![]);
2449    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2450    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2451
2452    let (ent_count, rel_count) =
2453        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2454
2455    Ok(EnrichItemResult::Done {
2456        memory_id: Some(memory_id),
2457        entity_id: None,
2458        entities: ent_count,
2459        rels: rel_count,
2460        chars_before: None,
2461        chars_after: None,
2462        cost,
2463        is_oauth,
2464    })
2465}
2466
2467fn call_entity_description(
2468    conn: &Connection,
2469    namespace: &str,
2470    entity_name: &str,
2471    binary: &Path,
2472    model: Option<&str>,
2473    timeout: u64,
2474    mode: &EnrichMode,
2475) -> Result<EnrichItemResult, AppError> {
2476    let (entity_id, entity_type): (i64, String) = conn
2477        .query_row(
2478            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2479            rusqlite::params![namespace, entity_name],
2480            |r| Ok((r.get(0)?, r.get(1)?)),
2481        )
2482        .map_err(|e| match e {
2483            rusqlite::Error::QueryReturnedNoRows => {
2484                AppError::NotFound(format!("entity '{entity_name}' not found"))
2485            }
2486            other => AppError::Database(other),
2487        })?;
2488
2489    let prompt = format!(
2490        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2491    );
2492
2493    let (value, cost, is_oauth) = match mode {
2494        EnrichMode::ClaudeCode => call_claude(
2495            binary,
2496            &prompt,
2497            ENTITY_DESCRIPTION_SCHEMA,
2498            "",
2499            model,
2500            timeout,
2501        )?,
2502        EnrichMode::Codex => call_codex(
2503            binary,
2504            &prompt,
2505            ENTITY_DESCRIPTION_SCHEMA,
2506            "",
2507            model,
2508            timeout,
2509        )?,
2510    };
2511
2512    let description = value
2513        .get("description")
2514        .and_then(|v| v.as_str())
2515        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2516
2517    persist_entity_description(conn, entity_id, description)?;
2518
2519    Ok(EnrichItemResult::Done {
2520        memory_id: None,
2521        entity_id: Some(entity_id),
2522        entities: 0,
2523        rels: 0,
2524        chars_before: None,
2525        chars_after: None,
2526        cost,
2527        is_oauth,
2528    })
2529}
2530
2531#[allow(clippy::too_many_arguments)]
2532fn call_body_enrich(
2533    conn: &Connection,
2534    namespace: &str,
2535    memory_name: &str,
2536    binary: &Path,
2537    model: Option<&str>,
2538    timeout: u64,
2539    mode: &EnrichMode,
2540    min_output_chars: usize,
2541    max_output_chars: usize,
2542    prompt_template: Option<&Path>,
2543    preserve_threshold: f64,
2544    paths: &crate::paths::AppPaths,
2545) -> Result<EnrichItemResult, AppError> {
2546    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2547        .query_row(
2548            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2549         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2550            rusqlite::params![namespace, memory_name],
2551            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2552        )
2553        .map_err(|e| match e {
2554            rusqlite::Error::QueryReturnedNoRows => {
2555                AppError::NotFound(format!("memory '{memory_name}' not found"))
2556            }
2557            other => AppError::Database(other),
2558        })?;
2559
2560    let chars_before = body.chars().count();
2561
2562    // G26: gather graph context for contextualized enrichment
2563    let linked_entities: Vec<String> = {
2564        let mut stmt = conn.prepare_cached(
2565            "SELECT e.name FROM memory_entities me \
2566             JOIN entities e ON e.id = me.entity_id \
2567             WHERE me.memory_id = ?1 LIMIT 10",
2568        )?;
2569        let result: Vec<String> = stmt
2570            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2571            .filter_map(|r| r.ok())
2572            .collect();
2573        drop(stmt);
2574        result
2575    };
2576
2577    // Load custom prompt template if provided
2578    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2579        let file_size = std::fs::metadata(tmpl_path)
2580            .map_err(|e| {
2581                AppError::Io(std::io::Error::new(
2582                    e.kind(),
2583                    format!("failed to stat prompt template: {e}"),
2584                ))
2585            })?
2586            .len();
2587        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2588            return Err(AppError::LimitExceeded(
2589                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2590            ));
2591        }
2592        std::fs::read_to_string(tmpl_path).map_err(|e| {
2593            AppError::Io(std::io::Error::new(
2594                e.kind(),
2595                format!("failed to read prompt template: {e}"),
2596            ))
2597        })?
2598    } else {
2599        BODY_ENRICH_PROMPT_PREFIX.to_string()
2600    };
2601
2602    // G26: build contextualized prompt with graph data
2603    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2604        let mut ctx = String::new();
2605        ctx.push_str(&format!(
2606            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2607        ));
2608        if !description.is_empty() {
2609            ctx.push_str(&format!("- Description: {description}\n"));
2610        }
2611        ctx.push_str(&format!("- Domain: {namespace}\n"));
2612        if !linked_entities.is_empty() {
2613            ctx.push_str(&format!(
2614                "- Linked entities: {}\n",
2615                linked_entities.join(", ")
2616            ));
2617        }
2618        ctx
2619    } else {
2620        String::new()
2621    };
2622
2623    let prompt = format!(
2624        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2625    );
2626
2627    // The body schema uses a free-form enriched_body field
2628    let (value, cost, is_oauth) = match mode {
2629        EnrichMode::ClaudeCode => {
2630            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2631        }
2632        EnrichMode::Codex => {
2633            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2634        }
2635    };
2636
2637    let enriched_body = value
2638        .get("enriched_body")
2639        .and_then(|v| v.as_str())
2640        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2641
2642    let chars_after = enriched_body.chars().count();
2643
2644    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2645    // a trigram-Jaccard similarity between the original body and the
2646    // LLM-rewritten body. When the score falls below
2647    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2648    // rewrite as a likely hallucination. The result is recorded in the
2649    // NDJSON stream so operators can audit what the LLM tried to do.
2650    let threshold = preserve_threshold;
2651    let verdict =
2652        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2653    if !verdict.is_accepted() {
2654        return Ok(EnrichItemResult::PreservationFailed {
2655            score: match verdict {
2656                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2657                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2658                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2659            },
2660            threshold,
2661            chars_before,
2662            chars_after,
2663        });
2664    }
2665
2666    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2667    // compare the hash of the original body against the hash of the enriched
2668    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2669    // body (rare but possible) — treat as `Skipped` so re-running the batch
2670    // is safe and the queue does not get re-persisted entries.
2671    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2672    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2673    if old_hash == new_hash {
2674        return Ok(EnrichItemResult::Skipped {
2675            reason: format!(
2676                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2677            ),
2678        });
2679    }
2680
2681    // Only persist if the enriched body is genuinely longer
2682    if chars_after <= chars_before {
2683        return Ok(EnrichItemResult::Skipped {
2684            reason: format!(
2685                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2686            ),
2687        });
2688    }
2689
2690    persist_enriched_body(
2691        conn,
2692        namespace,
2693        memory_id,
2694        memory_name,
2695        enriched_body,
2696        paths,
2697    )?;
2698
2699    Ok(EnrichItemResult::Done {
2700        memory_id: Some(memory_id),
2701        entity_id: None,
2702        entities: 0,
2703        rels: 0,
2704        chars_before: Some(chars_before),
2705        chars_after: Some(chars_after),
2706        cost,
2707        is_oauth,
2708    })
2709}
2710
2711fn call_reembed(
2712    conn: &Connection,
2713    namespace: &str,
2714    memory_name: &str,
2715    paths: &crate::paths::AppPaths,
2716) -> Result<EnrichItemResult, AppError> {
2717    let (memory_id, body, memory_type): (i64, String, String) = conn
2718        .query_row(
2719            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2720             FROM memories
2721             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2722            rusqlite::params![namespace, memory_name],
2723            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2724        )
2725        .map_err(|e| match e {
2726            rusqlite::Error::QueryReturnedNoRows => {
2727                AppError::NotFound(format!("memory '{memory_name}' not found"))
2728            }
2729            other => AppError::Database(other),
2730        })?;
2731
2732    if body.trim().is_empty() {
2733        return Ok(EnrichItemResult::Skipped {
2734            reason: "body is empty".to_string(),
2735        });
2736    }
2737
2738    reembed_memory_vector(
2739        conn,
2740        namespace,
2741        memory_id,
2742        memory_name,
2743        &memory_type,
2744        &body,
2745        paths,
2746    )?;
2747
2748    Ok(EnrichItemResult::Done {
2749        memory_id: Some(memory_id),
2750        entity_id: None,
2751        entities: 0,
2752        rels: 0,
2753        chars_before: Some(body.chars().count()),
2754        chars_after: Some(body.chars().count()),
2755        cost: 0.0,
2756        is_oauth: true,
2757    })
2758}
2759
2760// ---------------------------------------------------------------------------
2761// Scan dispatcher — maps operation to scan query result (item keys)
2762// ---------------------------------------------------------------------------
2763
2764fn scan_operation(
2765    conn: &Connection,
2766    namespace: &str,
2767    args: &EnrichArgs,
2768) -> Result<Vec<String>, AppError> {
2769    // G37: resolve --names + --names-file once and apply to every scan path.
2770    let name_filter = resolve_name_filter(args)?;
2771    match args.operation {
2772        EnrichOperation::MemoryBindings => {
2773            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2774            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2775        }
2776        EnrichOperation::EntityDescriptions => {
2777            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2778            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2779        }
2780        EnrichOperation::BodyEnrich => {
2781            let rows =
2782                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2783            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2784        }
2785        EnrichOperation::ReEmbed => {
2786            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2787            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2788        }
2789        EnrichOperation::WeightCalibrate => {
2790            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2791            Ok(rows
2792                .into_iter()
2793                .map(|(id, _, _, _, _)| id.to_string())
2794                .collect())
2795        }
2796        EnrichOperation::RelationReclassify => {
2797            let rows = scan_generic_relations(conn, namespace, args.limit)?;
2798            Ok(rows
2799                .into_iter()
2800                .map(|(id, _, _, _)| id.to_string())
2801                .collect())
2802        }
2803        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2804            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2805            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2806        }
2807        EnrichOperation::EntityTypeValidate => {
2808            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2809            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2810        }
2811        EnrichOperation::DescriptionEnrich => {
2812            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2813            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2814        }
2815        EnrichOperation::DomainClassify
2816        | EnrichOperation::GraphAudit
2817        | EnrichOperation::DeepResearchSynth
2818        | EnrichOperation::BodyExtract => {
2819            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2820            let sql = format!(
2821                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2822            );
2823            let mut stmt = conn.prepare(&sql)?;
2824            let names = stmt
2825                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2826                .collect::<Result<Vec<_>, _>>()?;
2827            Ok(names)
2828        }
2829    }
2830}
2831
2832// ---------------------------------------------------------------------------
2833// Codex stub provider
2834// ---------------------------------------------------------------------------
2835
2836/// Locates the Codex CLI binary.
2837fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2838    if let Some(p) = explicit {
2839        if p.exists() {
2840            return Ok(p.to_path_buf());
2841        }
2842        return Err(AppError::Validation(format!(
2843            "Codex binary not found at explicit path: {}",
2844            p.display()
2845        )));
2846    }
2847
2848    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2849        let p = PathBuf::from(&env_path);
2850        if p.exists() {
2851            return Ok(p);
2852        }
2853    }
2854
2855    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2856    if let Some(path_var) = std::env::var_os("PATH") {
2857        for dir in std::env::split_paths(&path_var) {
2858            let candidate = dir.join(name);
2859            if candidate.exists() {
2860                return Ok(crate::extract::llm_embedding::resolve_real_binary(
2861                    &candidate,
2862                ));
2863            }
2864        }
2865    }
2866
2867    Err(AppError::Validation(
2868        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2869    ))
2870}
2871
2872/// G27: Calibrate weight of a single relationship via LLM.
2873fn call_weight_calibrate(
2874    conn: &Connection,
2875    _namespace: &str,
2876    item_key: &str,
2877    binary: &Path,
2878    model: Option<&str>,
2879    timeout: u64,
2880    mode: &EnrichMode,
2881) -> Result<EnrichItemResult, AppError> {
2882    let rel_id: i64 = item_key
2883        .parse()
2884        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2885    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2886        .query_row(
2887            "SELECT e1.name, e2.name, r.relation, r.weight \
2888             FROM relationships r \
2889             JOIN entities e1 ON e1.id = r.source_id \
2890             JOIN entities e2 ON e2.id = r.target_id \
2891             WHERE r.id = ?1",
2892            rusqlite::params![rel_id],
2893            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2894        )
2895        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2896
2897    let input_text = format!(
2898        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2899    );
2900    let (value, cost, is_oauth) = match mode {
2901        EnrichMode::ClaudeCode => call_claude(
2902            binary,
2903            WEIGHT_CALIBRATE_PROMPT,
2904            WEIGHT_CALIBRATE_SCHEMA,
2905            &input_text,
2906            model,
2907            timeout,
2908        )?,
2909        EnrichMode::Codex => call_codex(
2910            binary,
2911            WEIGHT_CALIBRATE_PROMPT,
2912            WEIGHT_CALIBRATE_SCHEMA,
2913            &input_text,
2914            model,
2915            timeout,
2916        )?,
2917    };
2918
2919    let calibrated = value
2920        .get("calibrated_weight")
2921        .and_then(|v| v.as_f64())
2922        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2923
2924    conn.execute(
2925        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2926        rusqlite::params![calibrated, rel_id],
2927    )?;
2928
2929    Ok(EnrichItemResult::Done {
2930        memory_id: None,
2931        entity_id: None,
2932        entities: 0,
2933        rels: 1,
2934        chars_before: None,
2935        chars_after: None,
2936        cost,
2937        is_oauth,
2938    })
2939}
2940
2941/// G27: Reclassify a generic relationship type via LLM.
2942fn call_relation_reclassify(
2943    conn: &Connection,
2944    _namespace: &str,
2945    item_key: &str,
2946    binary: &Path,
2947    model: Option<&str>,
2948    timeout: u64,
2949    mode: &EnrichMode,
2950) -> Result<EnrichItemResult, AppError> {
2951    let rel_id: i64 = item_key
2952        .parse()
2953        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2954    let (source_name, target_name, current_relation): (String, String, String) = conn
2955        .query_row(
2956            "SELECT e1.name, e2.name, r.relation \
2957             FROM relationships r \
2958             JOIN entities e1 ON e1.id = r.source_id \
2959             JOIN entities e2 ON e2.id = r.target_id \
2960             WHERE r.id = ?1",
2961            rusqlite::params![rel_id],
2962            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2963        )
2964        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2965
2966    let input_text = format!(
2967        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2968    );
2969    let (value, cost, is_oauth) = match mode {
2970        EnrichMode::ClaudeCode => call_claude(
2971            binary,
2972            RELATION_RECLASSIFY_PROMPT,
2973            RELATION_RECLASSIFY_SCHEMA,
2974            &input_text,
2975            model,
2976            timeout,
2977        )?,
2978        EnrichMode::Codex => call_codex(
2979            binary,
2980            RELATION_RECLASSIFY_PROMPT,
2981            RELATION_RECLASSIFY_SCHEMA,
2982            &input_text,
2983            model,
2984            timeout,
2985        )?,
2986    };
2987
2988    let new_relation = value
2989        .get("relation")
2990        .and_then(|v| v.as_str())
2991        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2992    let new_strength = value
2993        .get("strength")
2994        .and_then(|v| v.as_f64())
2995        .unwrap_or(0.5);
2996
2997    conn.execute(
2998        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2999        rusqlite::params![new_relation, new_strength, rel_id],
3000    )?;
3001
3002    Ok(EnrichItemResult::Done {
3003        memory_id: None,
3004        entity_id: None,
3005        entities: 0,
3006        rels: 1,
3007        chars_before: None,
3008        chars_after: None,
3009        cost,
3010        is_oauth,
3011    })
3012}
3013
3014/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3015fn call_entity_connect(
3016    conn: &Connection,
3017    namespace: &str,
3018    item_key: &str,
3019    binary: &Path,
3020    model: Option<&str>,
3021    timeout: u64,
3022    mode: &EnrichMode,
3023) -> Result<EnrichItemResult, AppError> {
3024    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3025    let (e1_id, e1_name, e2_id, e2_name) =
3026        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3027            Some(p) => p,
3028            None => {
3029                return Ok(EnrichItemResult::Skipped {
3030                    reason: "pair no longer isolated".into(),
3031                })
3032            }
3033        };
3034    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3035    let (value, cost, is_oauth) = match mode {
3036        EnrichMode::ClaudeCode => call_claude(
3037            binary,
3038            ENTITY_CONNECT_PROMPT,
3039            ENTITY_CONNECT_SCHEMA,
3040            &input_text,
3041            model,
3042            timeout,
3043        )?,
3044        EnrichMode::Codex => call_codex(
3045            binary,
3046            ENTITY_CONNECT_PROMPT,
3047            ENTITY_CONNECT_SCHEMA,
3048            &input_text,
3049            model,
3050            timeout,
3051        )?,
3052    };
3053    let relation = value
3054        .get("relation")
3055        .and_then(|v| v.as_str())
3056        .unwrap_or("none");
3057    if relation == "none" {
3058        return Ok(EnrichItemResult::Skipped {
3059            reason: "LLM determined no relationship".into(),
3060        });
3061    }
3062    let strength = value
3063        .get("strength")
3064        .and_then(|v| v.as_f64())
3065        .unwrap_or(0.5);
3066    conn.execute(
3067        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3068        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3069    )?;
3070    Ok(EnrichItemResult::Done {
3071        memory_id: None,
3072        entity_id: None,
3073        entities: 0,
3074        rels: 1,
3075        chars_before: None,
3076        chars_after: None,
3077        cost,
3078        is_oauth,
3079    })
3080}
3081
3082/// G27 P2: Validate entity type assignment via LLM.
3083fn call_entity_type_validate(
3084    conn: &Connection,
3085    _namespace: &str,
3086    item_key: &str,
3087    binary: &Path,
3088    model: Option<&str>,
3089    timeout: u64,
3090    mode: &EnrichMode,
3091) -> Result<EnrichItemResult, AppError> {
3092    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3093        .query_row(
3094            "SELECT id, name, type FROM entities WHERE name = ?1",
3095            rusqlite::params![item_key],
3096            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3097        )
3098        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3099    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3100    let (value, cost, is_oauth) = match mode {
3101        EnrichMode::ClaudeCode => call_claude(
3102            binary,
3103            ENTITY_TYPE_VALIDATE_PROMPT,
3104            ENTITY_TYPE_VALIDATE_SCHEMA,
3105            &input_text,
3106            model,
3107            timeout,
3108        )?,
3109        EnrichMode::Codex => call_codex(
3110            binary,
3111            ENTITY_TYPE_VALIDATE_PROMPT,
3112            ENTITY_TYPE_VALIDATE_SCHEMA,
3113            &input_text,
3114            model,
3115            timeout,
3116        )?,
3117    };
3118    let validated_type = value
3119        .get("validated_type")
3120        .and_then(|v| v.as_str())
3121        .unwrap_or(&ent_type);
3122    let was_correct = value
3123        .get("was_correct")
3124        .and_then(|v| v.as_bool())
3125        .unwrap_or(true);
3126    if !was_correct {
3127        conn.execute(
3128            "UPDATE entities SET type = ?1 WHERE id = ?2",
3129            rusqlite::params![validated_type, ent_id],
3130        )?;
3131    }
3132    Ok(EnrichItemResult::Done {
3133        memory_id: None,
3134        entity_id: Some(ent_id),
3135        entities: 1,
3136        rels: 0,
3137        chars_before: None,
3138        chars_after: None,
3139        cost,
3140        is_oauth,
3141    })
3142}
3143
3144/// G27 P2: Enrich generic memory description via LLM.
3145fn call_description_enrich(
3146    conn: &Connection,
3147    _namespace: &str,
3148    item_key: &str,
3149    binary: &Path,
3150    model: Option<&str>,
3151    timeout: u64,
3152    mode: &EnrichMode,
3153) -> Result<EnrichItemResult, AppError> {
3154    let (mem_id, body, old_desc): (i64, String, String) = conn
3155        .query_row(
3156            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3157            rusqlite::params![item_key],
3158            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3159        )
3160        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3161    let snippet: String = body.chars().take(500).collect();
3162    let input_text = format!(
3163        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3164    );
3165    let (value, cost, is_oauth) = match mode {
3166        EnrichMode::ClaudeCode => call_claude(
3167            binary,
3168            DESCRIPTION_ENRICH_PROMPT,
3169            DESCRIPTION_ENRICH_SCHEMA,
3170            &input_text,
3171            model,
3172            timeout,
3173        )?,
3174        EnrichMode::Codex => call_codex(
3175            binary,
3176            DESCRIPTION_ENRICH_PROMPT,
3177            DESCRIPTION_ENRICH_SCHEMA,
3178            &input_text,
3179            model,
3180            timeout,
3181        )?,
3182    };
3183    let new_desc = value
3184        .get("description")
3185        .and_then(|v| v.as_str())
3186        .unwrap_or(&old_desc);
3187    conn.execute(
3188        "UPDATE memories SET description = ?1 WHERE id = ?2",
3189        rusqlite::params![new_desc, mem_id],
3190    )?;
3191    Ok(EnrichItemResult::Done {
3192        memory_id: Some(mem_id),
3193        entity_id: None,
3194        entities: 0,
3195        rels: 0,
3196        chars_before: Some(old_desc.len()),
3197        chars_after: Some(new_desc.len()),
3198        cost,
3199        is_oauth,
3200    })
3201}
3202
3203/// G27 P2: Classify memory into domain category via LLM.
3204fn call_domain_classify(
3205    conn: &Connection,
3206    _namespace: &str,
3207    item_key: &str,
3208    binary: &Path,
3209    model: Option<&str>,
3210    timeout: u64,
3211    mode: &EnrichMode,
3212) -> Result<EnrichItemResult, AppError> {
3213    let (mem_id, body, desc): (i64, String, String) = conn
3214        .query_row(
3215            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3216            rusqlite::params![item_key],
3217            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3218        )
3219        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3220    let snippet: String = body.chars().take(500).collect();
3221    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3222    let (value, cost, is_oauth) = match mode {
3223        EnrichMode::ClaudeCode => call_claude(
3224            binary,
3225            DOMAIN_CLASSIFY_PROMPT,
3226            DOMAIN_CLASSIFY_SCHEMA,
3227            &input_text,
3228            model,
3229            timeout,
3230        )?,
3231        EnrichMode::Codex => call_codex(
3232            binary,
3233            DOMAIN_CLASSIFY_PROMPT,
3234            DOMAIN_CLASSIFY_SCHEMA,
3235            &input_text,
3236            model,
3237            timeout,
3238        )?,
3239    };
3240    let domain = value
3241        .get("domain")
3242        .and_then(|v| v.as_str())
3243        .unwrap_or("uncategorized");
3244    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3245    conn.execute(
3246        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3247        rusqlite::params![metadata, mem_id],
3248    )?;
3249    Ok(EnrichItemResult::Done {
3250        memory_id: Some(mem_id),
3251        entity_id: None,
3252        entities: 0,
3253        rels: 0,
3254        chars_before: None,
3255        chars_after: None,
3256        cost,
3257        is_oauth,
3258    })
3259}
3260
3261/// G27 P2: Audit memory graph quality via LLM.
3262fn call_graph_audit(
3263    conn: &Connection,
3264    _namespace: &str,
3265    item_key: &str,
3266    binary: &Path,
3267    model: Option<&str>,
3268    timeout: u64,
3269    mode: &EnrichMode,
3270) -> Result<EnrichItemResult, AppError> {
3271    let (mem_id, body, desc): (i64, String, String) = conn
3272        .query_row(
3273            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3274            rusqlite::params![item_key],
3275            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3276        )
3277        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3278    let snippet: String = body.chars().take(500).collect();
3279    let ent_count: i64 = conn
3280        .query_row(
3281            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3282            rusqlite::params![mem_id],
3283            |r| r.get(0),
3284        )
3285        .unwrap_or(0);
3286    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3287    let (value, cost, is_oauth) = match mode {
3288        EnrichMode::ClaudeCode => call_claude(
3289            binary,
3290            GRAPH_AUDIT_PROMPT,
3291            GRAPH_AUDIT_SCHEMA,
3292            &input_text,
3293            model,
3294            timeout,
3295        )?,
3296        EnrichMode::Codex => call_codex(
3297            binary,
3298            GRAPH_AUDIT_PROMPT,
3299            GRAPH_AUDIT_SCHEMA,
3300            &input_text,
3301            model,
3302            timeout,
3303        )?,
3304    };
3305    let issues = value
3306        .get("issues")
3307        .and_then(|v| v.as_array())
3308        .map(|a| a.len())
3309        .unwrap_or(0);
3310    Ok(EnrichItemResult::Done {
3311        memory_id: Some(mem_id),
3312        entity_id: None,
3313        entities: 0,
3314        rels: issues,
3315        chars_before: None,
3316        chars_after: None,
3317        cost,
3318        is_oauth,
3319    })
3320}
3321
3322/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3323fn call_deep_research_synth(
3324    conn: &Connection,
3325    namespace: &str,
3326    item_key: &str,
3327    binary: &Path,
3328    model: Option<&str>,
3329    timeout: u64,
3330    mode: &EnrichMode,
3331) -> Result<EnrichItemResult, AppError> {
3332    let (mem_id, body): (i64, String) = conn
3333        .query_row(
3334            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3335            rusqlite::params![item_key],
3336            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3337        )
3338        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3339    let snippet: String = body.chars().take(2000).collect();
3340    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3341    let (value, cost, is_oauth) = match mode {
3342        EnrichMode::ClaudeCode => call_claude(
3343            binary,
3344            DEEP_RESEARCH_SYNTH_PROMPT,
3345            DEEP_RESEARCH_SYNTH_SCHEMA,
3346            &input_text,
3347            model,
3348            timeout,
3349        )?,
3350        EnrichMode::Codex => call_codex(
3351            binary,
3352            DEEP_RESEARCH_SYNTH_PROMPT,
3353            DEEP_RESEARCH_SYNTH_SCHEMA,
3354            &input_text,
3355            model,
3356            timeout,
3357        )?,
3358    };
3359    let mut ent_count = 0usize;
3360    let mut rel_count = 0usize;
3361    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3362        for e in ents {
3363            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3364            let etype_str = e
3365                .get("entity_type")
3366                .and_then(|v| v.as_str())
3367                .unwrap_or("concept");
3368            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3369            if name.len() >= 2 {
3370                let ne = NewEntity {
3371                    name: name.to_string(),
3372                    entity_type: etype,
3373                    description: None,
3374                };
3375                let _ = entities::upsert_entity(conn, namespace, &ne);
3376                ent_count += 1;
3377            }
3378        }
3379    }
3380    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3381        for r in rels {
3382            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3383            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3384            if src.is_empty() || tgt.is_empty() {
3385                continue;
3386            }
3387            let rel = r
3388                .get("relation")
3389                .and_then(|v| v.as_str())
3390                .unwrap_or("related");
3391            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3392            if let (Some(sid), Some(tid)) = (
3393                entities::find_entity_id(conn, namespace, src)?,
3394                entities::find_entity_id(conn, namespace, tgt)?,
3395            ) {
3396                let _ = entities::create_or_fetch_relationship(
3397                    conn, namespace, sid, tid, rel, str_, None,
3398                );
3399                rel_count += 1;
3400            }
3401        }
3402    }
3403    Ok(EnrichItemResult::Done {
3404        memory_id: Some(mem_id),
3405        entity_id: None,
3406        entities: ent_count,
3407        rels: rel_count,
3408        chars_before: None,
3409        chars_after: None,
3410        cost,
3411        is_oauth,
3412    })
3413}
3414
3415/// G27 P2: Extract structured body from unstructured text via LLM.
3416fn call_body_extract(
3417    conn: &Connection,
3418    _namespace: &str,
3419    item_key: &str,
3420    binary: &Path,
3421    model: Option<&str>,
3422    timeout: u64,
3423    mode: &EnrichMode,
3424) -> Result<EnrichItemResult, AppError> {
3425    let (mem_id, body): (i64, String) = conn
3426        .query_row(
3427            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3428            rusqlite::params![item_key],
3429            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3430        )
3431        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3432    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3433    let (value, cost, is_oauth) = match mode {
3434        EnrichMode::ClaudeCode => call_claude(
3435            binary,
3436            BODY_EXTRACT_PROMPT,
3437            BODY_EXTRACT_SCHEMA,
3438            &input_text,
3439            model,
3440            timeout,
3441        )?,
3442        EnrichMode::Codex => call_codex(
3443            binary,
3444            BODY_EXTRACT_PROMPT,
3445            BODY_EXTRACT_SCHEMA,
3446            &input_text,
3447            model,
3448            timeout,
3449        )?,
3450    };
3451    let restructured = value
3452        .get("restructured_body")
3453        .and_then(|v| v.as_str())
3454        .unwrap_or(&body);
3455    let chars_before = body.len();
3456    let chars_after = restructured.len();
3457    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3458    conn.execute(
3459        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3460        rusqlite::params![restructured, new_hash, mem_id],
3461    )?;
3462    Ok(EnrichItemResult::Done {
3463        memory_id: Some(mem_id),
3464        entity_id: None,
3465        entities: 0,
3466        rels: 0,
3467        chars_before: Some(chars_before),
3468        chars_after: Some(chars_after),
3469        cost,
3470        is_oauth,
3471    })
3472}
3473
3474/// Scan for pairs of entities that share no direct relationship.
3475#[allow(clippy::type_complexity)]
3476fn scan_isolated_entity_pairs(
3477    conn: &Connection,
3478    namespace: &str,
3479    limit: Option<usize>,
3480) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3481    let limit_val = limit.unwrap_or(50) as i64;
3482    let mut stmt = conn.prepare_cached(
3483        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3484         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3485         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3486           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3487           (r.source_id = e2.id AND r.target_id = e1.id)) \
3488         LIMIT ?2",
3489    )?;
3490    let rows = stmt
3491        .query_map(rusqlite::params![namespace, limit_val], |r| {
3492            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3493        })?
3494        .collect::<Result<Vec<_>, _>>()?;
3495    Ok(rows)
3496}
3497
3498/// Scan for entities with non-validated types (all entities for type audit).
3499fn scan_entities_for_type_validation(
3500    conn: &Connection,
3501    namespace: &str,
3502    limit: Option<usize>,
3503) -> Result<Vec<(i64, String, String)>, AppError> {
3504    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3505    let sql = format!(
3506        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3507    );
3508    let mut stmt = conn.prepare(&sql)?;
3509    let rows = stmt
3510        .query_map(rusqlite::params![namespace], |r| {
3511            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3512        })?
3513        .collect::<Result<Vec<_>, _>>()?;
3514    Ok(rows)
3515}
3516
3517/// Scan for memories with generic descriptions (ingested, imported, etc).
3518fn scan_generic_descriptions(
3519    conn: &Connection,
3520    namespace: &str,
3521    limit: Option<usize>,
3522) -> Result<Vec<(i64, String, String)>, AppError> {
3523    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3524    let sql = format!(
3525        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3526         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3527         ORDER BY id {limit_clause}"
3528    );
3529    let mut stmt = conn.prepare(&sql)?;
3530    let rows = stmt
3531        .query_map(rusqlite::params![namespace], |r| {
3532            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3533        })?
3534        .collect::<Result<Vec<_>, _>>()?;
3535    Ok(rows)
3536}
3537
3538/// Calls the Codex CLI for a single enrichment item.
3539///
3540/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3541fn call_codex(
3542    binary: &Path,
3543    prompt: &str,
3544    json_schema: &str,
3545    input_text: &str,
3546    model: Option<&str>,
3547    timeout_secs: u64,
3548) -> Result<(serde_json::Value, f64, bool), AppError> {
3549    use wait_timeout::ChildExt;
3550
3551    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3552    // schema to a trusted cache path (not /tmp), and reuse the
3553    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3554    // hardening rationale.
3555    super::codex_spawn::validate_codex_model(model)?;
3556    let schema_file = super::codex_spawn::trusted_schema_path()?;
3557
3558    let args = super::codex_spawn::CodexSpawnArgs {
3559        binary,
3560        prompt,
3561        json_schema,
3562        input_text,
3563        model,
3564        timeout_secs,
3565        schema_path: schema_file.clone(),
3566    };
3567    let mut cmd = super::codex_spawn::build_codex_command(&args);
3568
3569    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3570        AppError::Io(std::io::Error::new(
3571            e.kind(),
3572            format!("failed to spawn codex: {e}"),
3573        ))
3574    })?;
3575
3576    let full_prompt = format!("{prompt}\n\n{input_text}");
3577    let stdin_bytes = full_prompt.into_bytes();
3578    let mut child_stdin = child
3579        .stdin
3580        .take()
3581        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3582    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3583        child_stdin.write_all(&stdin_bytes)?;
3584        drop(child_stdin);
3585        Ok(())
3586    });
3587
3588    let start = std::time::Instant::now();
3589    let timeout = std::time::Duration::from_secs(timeout_secs);
3590    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3591    let _ = std::fs::remove_file(&schema_file);
3592
3593    match status {
3594        Some(exit_status) => {
3595            stdin_thread
3596                .join()
3597                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3598                .map_err(AppError::Io)?;
3599
3600            tracing::debug!(
3601                target: "process",
3602                exit_code = ?exit_status.code(),
3603                elapsed_ms = start.elapsed().as_millis() as u64,
3604                "external process completed"
3605            );
3606
3607            let mut stdout_buf = Vec::new();
3608            if let Some(mut out) = child.stdout.take() {
3609                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3610            }
3611            if !exit_status.success() {
3612                let mut stderr_buf = Vec::new();
3613                if let Some(mut err) = child.stderr.take() {
3614                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3615                }
3616                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3617                tracing::warn!(
3618                    target: "enrich",
3619                    exit_code = ?exit_status.code(),
3620                    stderr = %stderr_str.trim(),
3621                    "codex process failed"
3622                );
3623                return Err(AppError::Validation(format!(
3624                    "codex exited with code {:?}: {}",
3625                    exit_status.code(),
3626                    stderr_str.trim()
3627                )));
3628            }
3629            let stdout_str = String::from_utf8(stdout_buf)
3630                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3631            // G32: use the JSONL parser, NOT serde_json::from_str on the
3632            // entire stdout (codex emits one event per line).
3633            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3634            // Return the raw agent_message text parsed as JSON. Different
3635            // operations (memory-bindings, body-enrich) use different
3636            // output schemas, so we let the caller pick which fields to
3637            // extract. The previous implementation hardcoded
3638            // `{entities, urls}` which broke body-enrich.
3639            let value: serde_json::Value =
3640                serde_json::from_str(&result.last_agent_text).map_err(|e| {
3641                    AppError::Validation(format!(
3642                        "codex agent_message is not valid JSON: {e}; raw={}",
3643                        result.last_agent_text
3644                    ))
3645                })?;
3646            Ok((value, 0.0, false))
3647        }
3648        None => {
3649            let _ = child.kill();
3650            let _ = child.wait();
3651            let _ = stdin_thread.join();
3652            Err(AppError::Validation(format!(
3653                "codex timed out after {timeout_secs} seconds"
3654            )))
3655        }
3656    }
3657}
3658
3659// ---------------------------------------------------------------------------
3660// Tests
3661// ---------------------------------------------------------------------------
3662
3663#[cfg(test)]
3664mod tests {
3665    use super::*;
3666    use rusqlite::Connection;
3667    #[cfg(unix)]
3668    use std::os::unix::fs::PermissionsExt;
3669
3670    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
3671    fn open_test_db() -> Connection {
3672        let conn = Connection::open_in_memory().expect("in-memory db");
3673        conn.execute_batch(
3674            "CREATE TABLE memories (
3675                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3676                namespace   TEXT NOT NULL DEFAULT 'global',
3677                name        TEXT NOT NULL,
3678                type        TEXT NOT NULL DEFAULT 'note',
3679                description TEXT NOT NULL DEFAULT '',
3680                body        TEXT NOT NULL DEFAULT '',
3681                body_hash   TEXT NOT NULL DEFAULT '',
3682                session_id  TEXT,
3683                source      TEXT NOT NULL DEFAULT 'agent',
3684                metadata    TEXT NOT NULL DEFAULT '{}',
3685                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3686                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3687                deleted_at  INTEGER,
3688                UNIQUE(namespace, name)
3689            );
3690            CREATE TABLE entities (
3691                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3692                namespace   TEXT NOT NULL DEFAULT 'global',
3693                name        TEXT NOT NULL,
3694                type        TEXT NOT NULL DEFAULT 'concept',
3695                description TEXT,
3696                degree      INTEGER NOT NULL DEFAULT 0,
3697                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3698                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3699                UNIQUE(namespace, name)
3700            );
3701            CREATE TABLE memory_entities (
3702                memory_id  INTEGER NOT NULL,
3703                entity_id  INTEGER NOT NULL,
3704                PRIMARY KEY (memory_id, entity_id)
3705            );
3706            CREATE TABLE relationships (
3707                id         INTEGER PRIMARY KEY AUTOINCREMENT,
3708                namespace  TEXT NOT NULL DEFAULT 'global',
3709                source_id  INTEGER NOT NULL,
3710                target_id  INTEGER NOT NULL,
3711                relation   TEXT NOT NULL,
3712                weight     REAL NOT NULL DEFAULT 0.5,
3713                description TEXT,
3714                UNIQUE(source_id, target_id, relation)
3715            );
3716            CREATE TABLE memory_embeddings (
3717                memory_id   INTEGER PRIMARY KEY,
3718                namespace   TEXT NOT NULL,
3719                embedding   BLOB NOT NULL,
3720                source      TEXT NOT NULL,
3721                model       TEXT NOT NULL DEFAULT '',
3722                dim         INTEGER NOT NULL DEFAULT 384,
3723                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
3724            );",
3725        )
3726        .expect("schema creation must succeed");
3727        conn
3728    }
3729
3730    #[test]
3731    fn scan_unbound_memories_finds_memories_without_bindings() {
3732        let conn = open_test_db();
3733        conn.execute(
3734            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3735            [],
3736        )
3737        .unwrap();
3738
3739        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3740        assert_eq!(results.len(), 1);
3741        assert_eq!(results[0].1, "test-mem");
3742    }
3743
3744    #[test]
3745    fn scan_unbound_memories_excludes_bound_memories() {
3746        let conn = open_test_db();
3747        conn.execute(
3748            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3749            [],
3750        )
3751        .unwrap();
3752        let mem_id: i64 = conn
3753            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3754                r.get(0)
3755            })
3756            .unwrap();
3757        conn.execute(
3758            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3759            [],
3760        )
3761        .unwrap();
3762        let ent_id: i64 = conn
3763            .query_row(
3764                "SELECT id FROM entities WHERE name='some-entity'",
3765                [],
3766                |r| r.get(0),
3767            )
3768            .unwrap();
3769        conn.execute(
3770            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3771            rusqlite::params![mem_id, ent_id],
3772        )
3773        .unwrap();
3774
3775        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3776        assert!(results.is_empty(), "bound memory must not appear in scan");
3777    }
3778
3779    #[test]
3780    fn scan_entities_without_description_finds_null_description() {
3781        let conn = open_test_db();
3782        conn.execute(
3783            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3784            [],
3785        )
3786        .unwrap();
3787
3788        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3789        assert_eq!(results.len(), 1);
3790        assert_eq!(results[0].1, "my-tool");
3791    }
3792
3793    #[test]
3794    fn scan_entities_without_description_excludes_entities_with_description() {
3795        let conn = open_test_db();
3796        conn.execute(
3797            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3798            [],
3799        )
3800        .unwrap();
3801
3802        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3803        assert!(
3804            results.is_empty(),
3805            "entity with description must not appear"
3806        );
3807    }
3808
3809    #[test]
3810    fn scan_short_body_memories_finds_short_bodies() {
3811        let conn = open_test_db();
3812        conn.execute(
3813            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3814            [],
3815        )
3816        .unwrap();
3817
3818        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3819        assert_eq!(results.len(), 1);
3820        assert_eq!(results[0].1, "short-mem");
3821    }
3822
3823    #[test]
3824    fn scan_short_body_memories_excludes_long_bodies() {
3825        let conn = open_test_db();
3826        let long_body = "a".repeat(1000);
3827        conn.execute(
3828            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3829            rusqlite::params![long_body],
3830        )
3831        .unwrap();
3832
3833        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3834        assert!(results.is_empty(), "long memory must not appear in scan");
3835    }
3836
3837    #[test]
3838    fn scan_respects_limit() {
3839        let conn = open_test_db();
3840        for i in 0..5 {
3841            conn.execute(
3842                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3843                [],
3844            )
3845            .unwrap();
3846        }
3847
3848        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3849        assert_eq!(results.len(), 3, "limit must be respected");
3850    }
3851
3852    #[test]
3853    fn scan_memories_without_embeddings_finds_only_missing_rows() {
3854        let conn = open_test_db();
3855        conn.execute(
3856            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3857            [],
3858        )
3859        .unwrap();
3860        conn.execute(
3861            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3862            [],
3863        )
3864        .unwrap();
3865        let memory_id: i64 = conn
3866            .query_row(
3867                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3868                [],
3869                |r| r.get(0),
3870            )
3871            .unwrap();
3872        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3873        memories::upsert_vec(
3874            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3875        )
3876        .unwrap();
3877
3878        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3879        assert_eq!(results.len(), 1);
3880        assert_eq!(results[0].1, "missing-vec");
3881    }
3882
3883    #[test]
3884    fn scan_memories_without_embeddings_respects_name_filter() {
3885        let conn = open_test_db();
3886        conn.execute(
3887            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3888            [],
3889        )
3890        .unwrap();
3891        conn.execute(
3892            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3893            [],
3894        )
3895        .unwrap();
3896
3897        let results =
3898            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3899                .unwrap();
3900        assert_eq!(results.len(), 1);
3901        assert_eq!(results[0].1, "match-me");
3902    }
3903
3904    #[test]
3905    fn queue_db_schema_creates_correctly() {
3906        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3907        let conn = open_queue_db(&tmp_path).expect("queue db must open");
3908        let count: i64 = conn
3909            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3910            .unwrap();
3911        assert_eq!(count, 0);
3912        let _ = std::fs::remove_file(&tmp_path);
3913    }
3914
3915    #[test]
3916    fn parse_claude_output_valid_bindings() {
3917        let output = r#"[
3918            {"type":"system","subtype":"init"},
3919            {"type":"result","is_error":false,"total_cost_usd":0.01,
3920             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3921        ]"#;
3922        let result = crate::commands::claude_runner::parse_claude_output(output)
3923            .expect("must parse successfully");
3924        assert!(result.value.get("entities").is_some());
3925        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3926        assert!(!result.is_oauth);
3927    }
3928
3929    #[test]
3930    fn parse_claude_output_detects_oauth() {
3931        let output = r#"[
3932            {"type":"system","subtype":"init","apiKeySource":"none"},
3933            {"type":"result","is_error":false,"total_cost_usd":0.0,
3934             "structured_output":{"entities":[],"relationships":[]}}
3935        ]"#;
3936        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3937        assert!(result.is_oauth);
3938    }
3939
3940    #[test]
3941    fn parse_claude_output_rate_limit_returns_error() {
3942        let output = r#"[
3943            {"type":"system","subtype":"init"},
3944            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3945        ]"#;
3946        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3947        assert!(matches!(err, AppError::RateLimited { .. }));
3948    }
3949
3950    #[test]
3951    fn parse_claude_output_auth_error() {
3952        let output = r#"[
3953            {"type":"system","subtype":"init"},
3954            {"type":"result","is_error":true,"error":"authentication failed"}
3955        ]"#;
3956        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3957        assert!(format!("{err}").contains("authentication failed"));
3958    }
3959
3960    #[cfg(unix)]
3961    #[test]
3962    fn call_codex_returns_raw_json_for_body_enrich_schema() {
3963        let tmp = tempfile::tempdir().expect("tempdir");
3964        let binary = tmp.path().join("codex-mock");
3965        std::fs::write(
3966            &binary,
3967            r#"#!/usr/bin/env bash
3968set -euo pipefail
3969cat <<'JSONL'
3970{"type":"thread.started","thread_id":"mock-thread-0"}
3971{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
3972{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
3973JSONL
3974"#,
3975        )
3976        .expect("mock codex write");
3977        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
3978        perms.set_mode(0o755);
3979        std::fs::set_permissions(&binary, perms).expect("chmod");
3980
3981        let (value, cost, is_oauth) =
3982            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
3983                .expect("call_codex must accept body-enrich payload");
3984
3985        assert_eq!(value["enriched_body"], "expanded body");
3986        assert_eq!(cost, 0.0);
3987        assert!(!is_oauth);
3988    }
3989
3990    #[test]
3991    fn dry_run_emits_preview_without_calling_llm() {
3992        // This test validates the dry-run NDJSON contract without spawning any process.
3993        // The scan_operation function requires a DB; we build one in-memory but cannot
3994        // call run() directly because it needs AppPaths (disk). Instead we test the
3995        // lower-level helpers that the dry-run path relies on.
3996        let conn = open_test_db();
3997        conn.execute(
3998            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3999            [],
4000        )
4001        .unwrap();
4002
4003        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4004        assert_eq!(results.len(), 1);
4005        assert_eq!(results[0].1, "dry-mem");
4006        // If scan finds the item and dry_run is set, no LLM would be called.
4007        // The NDJSON emission is tested via integration tests with a fake binary.
4008    }
4009
4010    #[test]
4011    fn persist_entity_description_updates_db() {
4012        let conn = open_test_db();
4013        conn.execute(
4014            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4015            [],
4016        )
4017        .unwrap();
4018        let eid: i64 = conn
4019            .query_row(
4020                "SELECT id FROM entities WHERE name='tokio-runtime'",
4021                [],
4022                |r| r.get(0),
4023            )
4024            .unwrap();
4025
4026        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4027
4028        let desc: String = conn
4029            .query_row(
4030                "SELECT description FROM entities WHERE id=?1",
4031                rusqlite::params![eid],
4032                |r| r.get(0),
4033            )
4034            .unwrap();
4035        assert_eq!(desc, "Async runtime for Rust applications");
4036    }
4037
4038    #[test]
4039    fn bindings_schema_is_valid_json() {
4040        let _: serde_json::Value =
4041            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4042    }
4043
4044    #[test]
4045    fn entity_description_schema_is_valid_json() {
4046        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4047            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4048    }
4049
4050    #[test]
4051    fn body_enrich_schema_is_valid_json() {
4052        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4053            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4054    }
4055}