Skip to main content

sqlite_graphrag/commands/
enrich.rs

1//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
2//!
3//! Enriches the knowledge graph by running LLM-powered analysis over memories
4//! and entities that are missing key structural data. Operations are:
5//!
6//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
7//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
8//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
9//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
10//!
11//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
12//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
13// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
14//!
15//! # DRY opportunity
16//!
17//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
18//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
19//! here verbatim. A future refactoring could extract them into a shared
20//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
21//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
22//! this stream's boundary — flagged here for the Integration stream to evaluate.
23
24use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39// ---------------------------------------------------------------------------
40// Constants
41// ---------------------------------------------------------------------------
42
43const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48// ---------------------------------------------------------------------------
49// JSON schema used for memory-bindings and body-enrich extraction
50// ---------------------------------------------------------------------------
51
52const BINDINGS_SCHEMA: &str = r#"{
53  "type": "object",
54  "properties": {
55    "entities": {
56      "type": "array",
57      "items": {
58        "type": "object",
59        "properties": {
60          "name": { "type": "string" },
61          "entity_type": {
62            "type": "string",
63            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64          }
65        },
66        "required": ["name", "entity_type"],
67        "additionalProperties": false
68      }
69    },
70    "relationships": {
71      "type": "array",
72      "items": {
73        "type": "object",
74        "properties": {
75          "source": { "type": "string" },
76          "target": { "type": "string" },
77          "relation": {
78            "type": "string",
79            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80          },
81          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82        },
83        "required": ["source","target","relation","strength"],
84        "additionalProperties": false
85      }
86    }
87  },
88  "required": ["entities","relationships"],
89  "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93  "type": "object",
94  "properties": {
95    "description": { "type": "string" }
96  },
97  "required": ["description"],
98  "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102  "type": "object",
103  "properties": {
104    "enriched_body": { "type": "string" }
105  },
106  "required": ["enriched_body"],
107  "additionalProperties": false
108}"#;
109
110// G27 P1: weight-calibrate
111const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120  "type": "object",
121  "properties": {
122    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123    "reasoning": { "type": "string" }
124  },
125  "required": ["calibrated_weight", "reasoning"],
126  "additionalProperties": false
127}"#;
128
129// G27 P1: relation-reclassify
130const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146  "type": "object",
147  "properties": {
148    "relation": { "type": "string" },
149    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150    "reasoning": { "type": "string" }
151  },
152  "required": ["relation", "strength", "reasoning"],
153  "additionalProperties": false
154}"#;
155
156// G27 P2: entity-connect — suggest relationships between isolated entities
157const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163  "type": "object",
164  "properties": {
165    "relation": { "type": "string" },
166    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167    "reasoning": { "type": "string" }
168  },
169  "required": ["relation", "strength", "reasoning"],
170  "additionalProperties": false
171}"#;
172
173// G27 P2: entity-type-validate — verify entity type assignments
174const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180  "type": "object",
181  "properties": {
182    "validated_type": { "type": "string" },
183    "was_correct": { "type": "boolean" },
184    "reasoning": { "type": "string" }
185  },
186  "required": ["validated_type", "was_correct", "reasoning"],
187  "additionalProperties": false
188}"#;
189
190// G27 P2: description-enrich — improve generic memory descriptions
191const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197  "type": "object",
198  "properties": {
199    "description": { "type": "string" },
200    "reasoning": { "type": "string" }
201  },
202  "required": ["description", "reasoning"],
203  "additionalProperties": false
204}"#;
205
206// G27 P2: domain-classify — classify memory into domain category
207const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211  "type": "object",
212  "properties": {
213    "domain": { "type": "string" },
214    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215    "reasoning": { "type": "string" }
216  },
217  "required": ["domain", "confidence", "reasoning"],
218  "additionalProperties": false
219}"#;
220
221// G27 P2: graph-audit — audit graph for quality issues
222const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227  "type": "object",
228  "properties": {
229    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231    "reasoning": { "type": "string" }
232  },
233  "required": ["quality_score", "issues", "reasoning"],
234  "additionalProperties": false
235}"#;
236
237// G27 P2: deep-research-synth — synthesize research findings into graph
238const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244  "type": "object",
245  "properties": {
246    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248    "summary": { "type": "string" }
249  },
250  "required": ["entities", "relationships", "summary"],
251  "additionalProperties": false
252}"#;
253
254// G27 P2: body-extract — extract structured content from unstructured text
255const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260  "type": "object",
261  "properties": {
262    "restructured_body": { "type": "string" },
263    "changes_summary": { "type": "string" }
264  },
265  "required": ["restructured_body", "changes_summary"],
266  "additionalProperties": false
267}"#;
268
269// ---------------------------------------------------------------------------
270// Prompts
271// ---------------------------------------------------------------------------
272
273const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288// ---------------------------------------------------------------------------
289// CLI args
290// ---------------------------------------------------------------------------
291
292/// Operation to perform in the `enrich` command.
293#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296    /// Add missing entity/relationship bindings to memories (fully implemented).
297    MemoryBindings,
298    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
299    EntityDescriptions,
300    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
301    BodyEnrich,
302    /// Rebuild missing memory embeddings without rewriting the memory body.
303    ReEmbed,
304    /// Calibrate relationship weights using LLM analysis (scan only).
305    WeightCalibrate,
306    /// Reclassify relationship types using LLM judgment (scan only).
307    RelationReclassify,
308    /// Connect isolated entities by suggesting new relationships (scan only).
309    EntityConnect,
310    /// Validate entity type assignments using LLM judgment (scan only).
311    EntityTypeValidate,
312    /// Enrich memory descriptions that are generic/auto-generated (scan only).
313    DescriptionEnrich,
314    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
315    CrossDomainBridges,
316    /// Classify memories into domain categories (scan only).
317    DomainClassify,
318    /// Audit the graph for quality issues (scan only).
319    GraphAudit,
320    /// Synthesize deep-research findings into graph memories (scan only).
321    DeepResearchSynth,
322    /// Extract structured body from unstructured text (scan only).
323    BodyExtract,
324}
325
326/// LLM provider for enrichment.
327#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329    /// Use locally installed Claude Code CLI (OAuth-first).
330    ClaudeCode,
331    /// Use locally installed OpenAI Codex CLI.
332    Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        match self {
338            EnrichMode::ClaudeCode => write!(f, "claude-code"),
339            EnrichMode::Codex => write!(f, "codex"),
340        }
341    }
342}
343
344/// Arguments for the `enrich` subcommand.
345#[derive(clap::Args)]
346#[command(
347    about = "Enrich graph memories and entities using an LLM provider",
348    after_long_help = "EXAMPLES:\n  \
349    # Add missing entity bindings to all unbound memories\n  \
350    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
351    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
352    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
353    # Expand short memory bodies (GAP-18)\n  \
354    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
355    # Rebuild only missing memory embeddings without rewriting bodies\n  \
356    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
357    # Resume an interrupted body-enrich run\n  \
358    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
359    # Retry only failed items from a previous run\n  \
360    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361    EXIT CODES:\n  \
362    0  success\n  \
363    1  validation error (bad args, binary not found)\n  \
364    14 I/O error"
365)]
366pub struct EnrichArgs {
367    /// Enrichment operation to run.
368    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369    pub operation: EnrichOperation,
370
371    /// LLM provider to use. Default: claude-code (OAuth-first).
372    #[arg(long, value_enum, default_value = "claude-code")]
373    pub mode: EnrichMode,
374
375    /// Maximum number of items to process in this run. Omit for all.
376    #[arg(long, value_name = "N")]
377    pub limit: Option<usize>,
378
379    /// Preview items without calling the LLM (zero tokens consumed).
380    #[arg(long)]
381    pub dry_run: bool,
382
383    /// Namespace to operate on. Default: global.
384    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385    pub namespace: Option<String>,
386
387    // -- Provider flags (Claude) --
388    /// Path to the Claude Code binary. Default: auto-detect from PATH.
389    #[arg(long, value_name = "PATH")]
390    pub claude_binary: Option<PathBuf>,
391
392    /// Claude model to use (e.g. claude-sonnet-4-6).
393    #[arg(long, value_name = "MODEL")]
394    pub claude_model: Option<String>,
395
396    /// Timeout per item in seconds when using Claude Code. Default: 300.
397    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398    pub claude_timeout: u64,
399
400    // -- Provider flags (Codex) --
401    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
402    #[arg(long, value_name = "PATH")]
403    pub codex_binary: Option<PathBuf>,
404
405    /// Codex model to use (e.g. o4-mini).
406    #[arg(long, value_name = "MODEL")]
407    pub codex_model: Option<String>,
408
409    /// Timeout per item in seconds when using Codex. Default: 300.
410    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411    pub codex_timeout: u64,
412
413    // -- Cost controls --
414    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
415    #[arg(long, value_name = "USD")]
416    pub max_cost_usd: Option<f64>,
417
418    // -- Queue controls --
419    /// Resume a previously interrupted run (skip already-done items).
420    #[arg(long)]
421    pub resume: bool,
422
423    /// Retry only items that failed in a previous run.
424    #[arg(long)]
425    pub retry_failed: bool,
426
427    // -- body-enrich specific flags (GAP-18) --
428    /// Minimum output character count for body-enrich. Default: 500.
429    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430    pub min_output_chars: usize,
431
432    /// Maximum output character count for body-enrich. Default: 2000.
433    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434    pub max_output_chars: usize,
435
436    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
437    #[arg(long, default_value_t = true)]
438    pub preserve_check: bool,
439
440    /// Path to a custom prompt template file for body-enrich.
441    #[arg(long, value_name = "PATH")]
442    pub prompt_template: Option<PathBuf>,
443
444    /// Number of parallel LLM workers (default 1 = serial).
445    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
446    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
447    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448    pub llm_parallelism: u32,
449
450    // -- Output / infra --
451    /// Emit NDJSON output. Always true; flag accepted for compatibility.
452    #[arg(long)]
453    pub json: bool,
454
455    /// Database path override.
456    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457    pub db: Option<String>,
458
459    /// G30: poll for the job singleton every second for up to N seconds
460    /// when another invocation holds the lock. Default: 0 (fail fast).
461    #[arg(long, value_name = "SECONDS")]
462    pub wait_job_singleton: Option<u64>,
463
464    /// G30: force acquisition of the singleton lock by removing a stale
465    /// lock file from a previously crashed invocation. Use only when you
466    /// are certain no other `enrich`/`ingest` is running.
467    #[arg(long, default_value_t = false)]
468    pub force_job_singleton: bool,
469
470    /// G37: select a specific subset of memory names to enrich instead of
471    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
472    /// Empty when omitted (processes all candidates).
473    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474    pub names: Vec<String>,
475
476    /// G37: read the subset of memory names from a file (one per line).
477    /// Lines starting with `#` and empty lines are ignored. Combined with
478    /// `--names` (union) when both are set.
479    #[arg(long, value_name = "PATH")]
480    pub names_file: Option<PathBuf>,
481
482    /// G35: probe the LLM provider with a 1-turn ping before processing
483    /// the batch. Aborts with a clear error if the rate-limit window is
484    /// closed (avoids burning N turns only to fail on item 1).
485    #[arg(long, default_value_t = false)]
486    pub preflight_check: bool,
487
488    /// G35: if a preflight probe or in-flight call hits the Claude rate
489    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
490    /// of failing the batch. Ignored when `--mode` is already `codex`.
491    #[arg(long, value_enum)]
492    pub fallback_mode: Option<EnrichMode>,
493
494    /// G35: number of seconds before the OAuth rate-limit reset at which
495    /// the preflight probe should refuse to start. Default 300 (5 min).
496    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497    pub rate_limit_buffer: u64,
498
499    /// G28-D: refuse to start when the 1-minute load average exceeds
500    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
501    /// Set to false to skip the check on contended CI runners.
502    #[arg(long, default_value_t = true)]
503    pub max_load_check: bool,
504
505    /// G28-D: when the system is saturated, abort the job after this
506    /// many consecutive HardFailure outcomes. Default 5.
507    #[arg(long, value_name = "N", default_value_t = 5)]
508    pub circuit_breaker_threshold: u32,
509
510    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
511    /// original body and the LLM-rewritten body for the rewrite to be
512    /// accepted. Scores below the threshold are rejected and emitted as
513    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
514    /// gap specification). Ignored when `--operation` is not
515    /// `body-enrich`.
516    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517    pub preserve_threshold: f64,
518
519    /// G33 Passo 3: when set, validate `--codex-model` against the
520    /// ChatGPT Pro OAuth accepted-model list and abort with a
521    /// suggestion when the value is unknown. Default true (fail fast
522    /// to avoid burning OAuth turns). Set to false to opt out.
523    #[arg(long, default_value_t = true)]
524    pub codex_model_validate: bool,
525
526    /// G33 Passo 3: when set together with an invalid `--codex-model`,
527    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
528    /// instead of aborting. The substitution is recorded in the NDJSON
529    /// stream as `provider_substituted: true` for traceability.
530    #[arg(long, value_name = "MODEL")]
531    pub codex_model_fallback: Option<String>,
532}
533
534// ---------------------------------------------------------------------------
535// Internal types — raw LLM output structs
536// ---------------------------------------------------------------------------
537
538// ---------------------------------------------------------------------------
539// NDJSON event types emitted to stdout
540// ---------------------------------------------------------------------------
541
542#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544    phase: &'a str,
545    #[serde(skip_serializing_if = "Option::is_none")]
546    binary_path: Option<&'a str>,
547    #[serde(skip_serializing_if = "Option::is_none")]
548    version: Option<&'a str>,
549    #[serde(skip_serializing_if = "Option::is_none")]
550    items_total: Option<usize>,
551    #[serde(skip_serializing_if = "Option::is_none")]
552    items_pending: Option<usize>,
553    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
554    #[serde(skip_serializing_if = "Option::is_none")]
555    llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560    /// Item identifier (memory name or entity name).
561    item: &'a str,
562    status: &'a str,
563    #[serde(skip_serializing_if = "Option::is_none")]
564    memory_id: Option<i64>,
565    #[serde(skip_serializing_if = "Option::is_none")]
566    entity_id: Option<i64>,
567    #[serde(skip_serializing_if = "Option::is_none")]
568    entities: Option<usize>,
569    #[serde(skip_serializing_if = "Option::is_none")]
570    rels: Option<usize>,
571    #[serde(skip_serializing_if = "Option::is_none")]
572    chars_before: Option<usize>,
573    #[serde(skip_serializing_if = "Option::is_none")]
574    chars_after: Option<usize>,
575    #[serde(skip_serializing_if = "Option::is_none")]
576    cost_usd: Option<f64>,
577    #[serde(skip_serializing_if = "Option::is_none")]
578    elapsed_ms: Option<u64>,
579    #[serde(skip_serializing_if = "Option::is_none")]
580    error: Option<String>,
581    index: usize,
582    total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587    summary: bool,
588    operation: String,
589    items_total: usize,
590    completed: usize,
591    failed: usize,
592    skipped: usize,
593    cost_usd: f64,
594    elapsed_ms: u64,
595}
596
597use crate::output::emit_json_line as emit_json;
598
599// ---------------------------------------------------------------------------
600// Queue DB
601// ---------------------------------------------------------------------------
602
603/// Opens or creates the enrichment queue database.
604///
605/// The queue schema mirrors `ingest_claude` for resume/retry parity.
606/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
607///
608/// # DRY note
609///
610/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
611/// Both should be unified in a shared `llm_runner.rs` module by the
612/// Integration stream.
613fn open_queue_db(path: &str) -> Result<Connection, AppError> {
614    let conn = Connection::open(path)?;
615    conn.pragma_update(None, "journal_mode", "wal")?;
616    conn.execute_batch(
617        "CREATE TABLE IF NOT EXISTS queue (
618            id          INTEGER PRIMARY KEY AUTOINCREMENT,
619            item_key    TEXT NOT NULL UNIQUE,
620            item_type   TEXT NOT NULL DEFAULT 'memory',
621            status      TEXT NOT NULL DEFAULT 'pending',
622            memory_id   INTEGER,
623            entity_id   INTEGER,
624            entities    INTEGER DEFAULT 0,
625            rels        INTEGER DEFAULT 0,
626            error       TEXT,
627            cost_usd    REAL DEFAULT 0.0,
628            attempt     INTEGER DEFAULT 0,
629            elapsed_ms  INTEGER,
630            created_at  TEXT DEFAULT (datetime('now')),
631            done_at     TEXT
632        );
633        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
634    )?;
635    Ok(conn)
636}
637
638// ---------------------------------------------------------------------------
639// LLM invocation — Claude Code
640// ---------------------------------------------------------------------------
641
642/// Calls `claude -p` via the shared `claude_runner` module (G02).
643///
644/// Returns `(output_value, cost_usd, is_oauth)`.
645fn call_claude(
646    binary: &Path,
647    prompt: &str,
648    json_schema: &str,
649    input_text: &str,
650    model: Option<&str>,
651    timeout_secs: u64,
652) -> Result<(serde_json::Value, f64, bool), AppError> {
653    let result = crate::commands::claude_runner::run_claude(
654        binary,
655        prompt,
656        json_schema,
657        input_text,
658        model,
659        timeout_secs,
660        7,
661    )?;
662    Ok((result.value, result.cost_usd, result.is_oauth))
663}
664
665// ---------------------------------------------------------------------------
666// Preflight probe (G35) — single-turn ping to verify the LLM provider
667// ---------------------------------------------------------------------------
668
669/// Result of a single preflight ping (G35).
670enum PreflightOutcome {
671    /// The provider accepted the ping without rate-limit or other errors.
672    Healthy,
673    /// The provider rejected the ping due to OAuth rate limit. The
674    /// `suggestion` field is a human hint that callers can embed in the
675    /// user-facing error.
676    RateLimited {
677        reason: String,
678        suggestion: &'static str,
679    },
680    /// Any other provider error (binary missing, auth failure, etc.).
681    Error(AppError),
682}
683
684/// Probes the configured LLM provider with a 1-turn ping.
685///
686/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
687/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
688///
689/// The probe intentionally avoids spawning any MCP server children (G28-A)
690/// to keep its own process footprint at the minimum.
691fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
692    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
693
694    match args.mode {
695        EnrichMode::ClaudeCode => {
696            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
697                Ok(b) => b,
698                Err(e) => return PreflightOutcome::Error(e),
699            };
700            let mut cmd = std::process::Command::new(&bin);
701            cmd.env_clear();
702            for var in &["PATH", "HOME", "USER"] {
703                if let Ok(val) = std::env::var(var) {
704                    cmd.env(var, val);
705                }
706            }
707            cmd.arg("-p")
708                .arg("ping")
709                .arg("--max-turns")
710                .arg("1")
711                .arg("--strict-mcp-config")
712                .arg("--mcp-config")
713                .arg("{}")
714                .arg("--dangerously-skip-permissions")
715                .arg("--settings")
716                .arg("{\"hooks\":{}}")
717                .arg("--output-format")
718                .arg("json")
719                .stdin(std::process::Stdio::null())
720                .stdout(std::process::Stdio::piped())
721                .stderr(std::process::Stdio::piped());
722
723            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
724                Ok(c) => c,
725                Err(e) => {
726                    return PreflightOutcome::Error(AppError::Io(e));
727                }
728            };
729            let output = match wait_with_timeout(child, timeout) {
730                Ok(out) => out,
731                Err(e) => return PreflightOutcome::Error(e),
732            };
733            if !output.status.success() {
734                let stderr = String::from_utf8_lossy(&output.stderr);
735                if stderr.contains("hit your session limit")
736                    || stderr.contains("rate_limit")
737                    || stderr.contains("429")
738                {
739                    return PreflightOutcome::RateLimited {
740                        reason: stderr.trim().to_string(),
741                        suggestion:
742                            "wait for the OAuth window to reset or use --fallback-mode codex",
743                    };
744                }
745                return PreflightOutcome::Error(AppError::Validation(format!(
746                    "preflight probe failed: {stderr}",
747                    stderr = stderr.trim()
748                )));
749            }
750            PreflightOutcome::Healthy
751        }
752        EnrichMode::Codex => {
753            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
754                Ok(b) => b,
755                Err(e) => return PreflightOutcome::Error(e),
756            };
757            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
758                .map_err(PreflightOutcome::Error)
759                .ok();
760            let schema = "{}";
761            let schema_path = match super::codex_spawn::trusted_schema_path() {
762                Ok(p) => p,
763                Err(e) => return PreflightOutcome::Error(e),
764            };
765            let spawn_args = super::codex_spawn::CodexSpawnArgs {
766                binary: &bin,
767                prompt: "ping",
768                json_schema: schema,
769                input_text: "",
770                model: args.codex_model.as_deref(),
771                timeout_secs: args.rate_limit_buffer.max(60),
772                schema_path: schema_path.clone(),
773            };
774            let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
775            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
776                Ok(c) => c,
777                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
778            };
779            let output = match wait_with_timeout(child, timeout) {
780                Ok(out) => out,
781                Err(e) => return PreflightOutcome::Error(e),
782            };
783            let _ = std::fs::remove_file(&schema_path);
784            if !output.status.success() {
785                let stderr = String::from_utf8_lossy(&output.stderr);
786                if stderr.contains("rate_limit")
787                    || stderr.contains("429")
788                    || stderr.contains("Too Many Requests")
789                {
790                    return PreflightOutcome::RateLimited {
791                        reason: stderr.trim().to_string(),
792                        suggestion: "wait for the rate-limit window to reset",
793                    };
794                }
795                return PreflightOutcome::Error(AppError::Validation(format!(
796                    "preflight probe failed: {stderr}",
797                    stderr = stderr.trim()
798                )));
799            }
800            PreflightOutcome::Healthy
801        }
802    }
803}
804
805/// Cross-platform wait with timeout (no extra crate dependency).
806fn wait_with_timeout(
807    mut child: std::process::Child,
808    timeout: std::time::Duration,
809) -> Result<std::process::Output, AppError> {
810    use wait_timeout::ChildExt;
811    let start = std::time::Instant::now();
812    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
813    if status.is_none() {
814        let _ = child.kill();
815        let _ = child.wait();
816        return Err(AppError::Validation(format!(
817            "preflight probe timed out after {}s",
818            start.elapsed().as_secs()
819        )));
820    }
821    let mut stdout = Vec::new();
822    if let Some(mut out) = child.stdout.take() {
823        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
824    }
825    let mut stderr = Vec::new();
826    if let Some(mut err) = child.stderr.take() {
827        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
828    }
829    let exit = status.unwrap();
830    Ok(std::process::Output {
831        status: exit,
832        stdout,
833        stderr,
834    })
835}
836
837// ---------------------------------------------------------------------------
838// SCAN helpers — SQL queries that find items needing enrichment
839// ---------------------------------------------------------------------------
840
841/// Returns memories without any `memory_entities` binding.
842///
843/// These are the targets for `memory-bindings` enrichment. When `name_filter`
844/// is non-empty, restricts the scan to the given names (G37); unknown names
845/// are silently skipped (the caller can detect them by comparing
846/// requested vs. returned).
847fn scan_unbound_memories(
848    conn: &Connection,
849    namespace: &str,
850    limit: Option<usize>,
851    name_filter: &[String],
852) -> Result<Vec<(i64, String, String)>, AppError> {
853    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
854
855    if name_filter.is_empty() {
856        let sql = format!(
857            "SELECT m.id, m.name, m.body
858             FROM memories m
859             WHERE m.namespace = ?1
860               AND m.deleted_at IS NULL
861               AND NOT EXISTS (
862                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
863               )
864             ORDER BY m.id
865             {limit_clause}"
866        );
867        let mut stmt = conn.prepare(&sql)?;
868        let rows = stmt
869            .query_map(rusqlite::params![namespace], |r| {
870                Ok((
871                    r.get::<_, i64>(0)?,
872                    r.get::<_, String>(1)?,
873                    r.get::<_, String>(2)?,
874                ))
875            })?
876            .collect::<Result<Vec<_>, _>>()?;
877        Ok(rows)
878    } else {
879        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
880        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
881            .map(|i| format!("?{i}"))
882            .collect();
883        let in_clause = placeholders.join(", ");
884        let sql = format!(
885            "SELECT m.id, m.name, m.body
886             FROM memories m
887             WHERE m.namespace = ?1
888               AND m.deleted_at IS NULL
889               AND m.name IN ({in_clause})
890               AND NOT EXISTS (
891                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
892               )
893             ORDER BY m.id
894             {limit_clause}"
895        );
896        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
897        params_vec.push(&namespace);
898        for n in name_filter {
899            params_vec.push(n);
900        }
901        let mut stmt = conn.prepare(&sql)?;
902        let rows = stmt
903            .query_map(
904                rusqlite::params_from_iter(params_vec.iter().copied()),
905                |r| {
906                    Ok((
907                        r.get::<_, i64>(0)?,
908                        r.get::<_, String>(1)?,
909                        r.get::<_, String>(2)?,
910                    ))
911                },
912            )?
913            .collect::<Result<Vec<_>, _>>()?;
914        Ok(rows)
915    }
916}
917
918/// Reads a list of memory names from a UTF-8 text file (G37).
919///
920/// Empty lines and lines beginning with `#` are skipped. Returns a
921/// de-duplicated, order-preserving list of trimmed names.
922fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
923    let content = std::fs::read_to_string(path).map_err(|e| {
924        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
925    })?;
926    let mut seen = std::collections::HashSet::new();
927    let mut out = Vec::new();
928    for line in content.lines() {
929        let trimmed = line.trim();
930        if trimmed.is_empty() || trimmed.starts_with('#') {
931            continue;
932        }
933        if seen.insert(trimmed.to_string()) {
934            out.push(trimmed.to_string());
935        }
936    }
937    Ok(out)
938}
939
940/// Resolves the union of `--names` and `--names-file` (G37).
941fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
942    let mut combined: Vec<String> = args.names.clone();
943    if let Some(p) = &args.names_file {
944        let from_file = read_names_file(p)?;
945        for n in from_file {
946            if !combined.contains(&n) {
947                combined.push(n);
948            }
949        }
950    }
951    Ok(combined)
952}
953
954/// Returns entities with NULL or empty description.
955///
956/// These are the targets for `entity-descriptions` enrichment.
957fn scan_entities_without_description(
958    conn: &Connection,
959    namespace: &str,
960    limit: Option<usize>,
961) -> Result<Vec<(i64, String, String)>, AppError> {
962    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
963    let sql = format!(
964        "SELECT id, name, type
965         FROM entities
966         WHERE namespace = ?1
967           AND (description IS NULL OR description = '')
968         ORDER BY id
969         {limit_clause}"
970    );
971    let mut stmt = conn.prepare(&sql)?;
972    let rows = stmt
973        .query_map(rusqlite::params![namespace], |r| {
974            Ok((
975                r.get::<_, i64>(0)?,
976                r.get::<_, String>(1)?,
977                r.get::<_, String>(2)?,
978            ))
979        })?
980        .collect::<Result<Vec<_>, _>>()?;
981    Ok(rows)
982}
983
984/// Returns memories whose body length is below the configured minimum.
985///
986/// These are the targets for `body-enrich` (GAP-18).
987fn scan_short_body_memories(
988    conn: &Connection,
989    namespace: &str,
990    min_chars: usize,
991    limit: Option<usize>,
992) -> Result<Vec<(i64, String, String)>, AppError> {
993    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
994    let sql = format!(
995        "SELECT m.id, m.name, m.body
996         FROM memories m
997         WHERE m.namespace = ?1
998           AND m.deleted_at IS NULL
999           AND LENGTH(COALESCE(m.body,'')) < ?2
1000         ORDER BY m.id
1001         {limit_clause}"
1002    );
1003    let mut stmt = conn.prepare(&sql)?;
1004    let rows = stmt
1005        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1006            Ok((
1007                r.get::<_, i64>(0)?,
1008                r.get::<_, String>(1)?,
1009                r.get::<_, String>(2)?,
1010            ))
1011        })?
1012        .collect::<Result<Vec<_>, _>>()?;
1013    Ok(rows)
1014}
1015
1016/// Returns live memories that still have no row in `memory_embeddings`.
1017///
1018/// These are the targets for `re-embed`.
1019fn scan_memories_without_embeddings(
1020    conn: &Connection,
1021    namespace: &str,
1022    limit: Option<usize>,
1023    name_filter: &[String],
1024) -> Result<Vec<(i64, String, String)>, AppError> {
1025    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1026
1027    if name_filter.is_empty() {
1028        let sql = format!(
1029            "SELECT m.id, m.name, COALESCE(m.body,'')
1030             FROM memories m
1031             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1032             WHERE m.namespace = ?1
1033               AND m.deleted_at IS NULL
1034               AND me.memory_id IS NULL
1035             ORDER BY m.id
1036             {limit_clause}"
1037        );
1038        let mut stmt = conn.prepare(&sql)?;
1039        let rows = stmt
1040            .query_map(rusqlite::params![namespace], |r| {
1041                Ok((
1042                    r.get::<_, i64>(0)?,
1043                    r.get::<_, String>(1)?,
1044                    r.get::<_, String>(2)?,
1045                ))
1046            })?
1047            .collect::<Result<Vec<_>, _>>()?;
1048        Ok(rows)
1049    } else {
1050        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1051            .map(|i| format!("?{i}"))
1052            .collect();
1053        let in_clause = placeholders.join(", ");
1054        let sql = format!(
1055            "SELECT m.id, m.name, COALESCE(m.body,'')
1056             FROM memories m
1057             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1058             WHERE m.namespace = ?1
1059               AND m.deleted_at IS NULL
1060               AND m.name IN ({in_clause})
1061               AND me.memory_id IS NULL
1062             ORDER BY m.id
1063             {limit_clause}"
1064        );
1065        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1066        params_vec.push(&namespace);
1067        for n in name_filter {
1068            params_vec.push(n);
1069        }
1070        let mut stmt = conn.prepare(&sql)?;
1071        let rows = stmt
1072            .query_map(
1073                rusqlite::params_from_iter(params_vec.iter().copied()),
1074                |r| {
1075                    Ok((
1076                        r.get::<_, i64>(0)?,
1077                        r.get::<_, String>(1)?,
1078                        r.get::<_, String>(2)?,
1079                    ))
1080                },
1081            )?
1082            .collect::<Result<Vec<_>, _>>()?;
1083        Ok(rows)
1084    }
1085}
1086
1087/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1088#[allow(clippy::type_complexity)]
1089fn scan_weight_candidates(
1090    conn: &Connection,
1091    namespace: &str,
1092    limit: Option<usize>,
1093) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1094    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1095    let sql = format!(
1096        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1097         FROM relationships r \
1098         JOIN entities e1 ON e1.id = r.source_id \
1099         JOIN entities e2 ON e2.id = r.target_id \
1100         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1101         ORDER BY r.weight DESC {limit_clause}"
1102    );
1103    let mut stmt = conn.prepare(&sql)?;
1104    let rows = stmt
1105        .query_map(rusqlite::params![namespace], |r| {
1106            Ok((
1107                r.get::<_, i64>(0)?,
1108                r.get::<_, String>(1)?,
1109                r.get::<_, String>(2)?,
1110                r.get::<_, String>(3)?,
1111                r.get::<_, f64>(4)?,
1112            ))
1113        })?
1114        .collect::<Result<Vec<_>, _>>()?;
1115    Ok(rows)
1116}
1117
1118/// G27: Returns relationships with generic relation types (applies_to).
1119fn scan_generic_relations(
1120    conn: &Connection,
1121    namespace: &str,
1122    limit: Option<usize>,
1123) -> Result<Vec<(i64, String, String, String)>, AppError> {
1124    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1125    let sql = format!(
1126        "SELECT r.id, e1.name, e2.name, r.relation \
1127         FROM relationships r \
1128         JOIN entities e1 ON e1.id = r.source_id \
1129         JOIN entities e2 ON e2.id = r.target_id \
1130         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1131         ORDER BY r.id {limit_clause}"
1132    );
1133    let mut stmt = conn.prepare(&sql)?;
1134    let rows = stmt
1135        .query_map(rusqlite::params![namespace], |r| {
1136            Ok((
1137                r.get::<_, i64>(0)?,
1138                r.get::<_, String>(1)?,
1139                r.get::<_, String>(2)?,
1140                r.get::<_, String>(3)?,
1141            ))
1142        })?
1143        .collect::<Result<Vec<_>, _>>()?;
1144    Ok(rows)
1145}
1146
1147// ---------------------------------------------------------------------------
1148// PERSIST helpers for fully-implemented operations
1149// ---------------------------------------------------------------------------
1150
1151/// Persists entity bindings extracted by the LLM for a memory.
1152///
1153/// Creates entities via `upsert_entity`, links them to the memory via
1154/// `link_memory_entity`, and upserts relationships found between entities.
1155fn persist_memory_bindings(
1156    conn: &Connection,
1157    namespace: &str,
1158    memory_id: i64,
1159    entities_json: &serde_json::Value,
1160    rels_json: &serde_json::Value,
1161) -> Result<(usize, usize), AppError> {
1162    #[derive(Deserialize)]
1163    struct EntityItem {
1164        name: String,
1165        entity_type: String,
1166    }
1167    #[derive(Deserialize)]
1168    struct RelItem {
1169        source: String,
1170        target: String,
1171        relation: String,
1172        strength: f64,
1173    }
1174
1175    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1176        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1177
1178    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1179        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1180
1181    let mut ent_count = 0usize;
1182    let mut rel_count = 0usize;
1183
1184    for item in &extracted_entities {
1185        let entity_type = match item.entity_type.parse::<EntityType>() {
1186            Ok(et) => et,
1187            Err(_) => {
1188                tracing::warn!(
1189                    target: "enrich",
1190                    entity = %item.name,
1191                    entity_type = %item.entity_type,
1192                    "entity type not recognized, skipping"
1193                );
1194                continue;
1195            }
1196        };
1197        match entities::upsert_entity(
1198            conn,
1199            namespace,
1200            &NewEntity {
1201                name: item.name.clone(),
1202                entity_type,
1203                description: None,
1204            },
1205        ) {
1206            Ok(eid) => {
1207                let _ = entities::link_memory_entity(conn, memory_id, eid);
1208                ent_count += 1;
1209            }
1210            Err(e) => {
1211                tracing::warn!(
1212                    target: "enrich",
1213                    entity = %item.name,
1214                    error = %e,
1215                    "entity upsert skipped"
1216                );
1217            }
1218        }
1219    }
1220
1221    for rel in &extracted_rels {
1222        let normalized = crate::parsers::normalize_relation(&rel.relation);
1223        crate::parsers::warn_if_non_canonical(&normalized);
1224
1225        // Normalize entity names before lookup: upsert_entity normalizes on write,
1226        // so the lookup must use the same normalized form to find the row.
1227        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1228        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1229        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1230        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1231        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1232            let new_rel = NewRelationship {
1233                source: rel.source.clone(),
1234                target: rel.target.clone(),
1235                relation: normalized,
1236                strength: rel.strength,
1237                description: None,
1238            };
1239            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1240                rel_count += 1;
1241            }
1242        }
1243    }
1244
1245    Ok((ent_count, rel_count))
1246}
1247
1248/// Updates an entity's description directly in the `entities` table.
1249fn persist_entity_description(
1250    conn: &Connection,
1251    entity_id: i64,
1252    description: &str,
1253) -> Result<(), AppError> {
1254    conn.execute(
1255        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1256        rusqlite::params![description, entity_id],
1257    )?;
1258    Ok(())
1259}
1260
1261fn reembed_memory_vector(
1262    conn: &Connection,
1263    namespace: &str,
1264    memory_id: i64,
1265    memory_name: &str,
1266    memory_type: &str,
1267    body: &str,
1268    paths: &crate::paths::AppPaths,
1269) -> Result<(), AppError> {
1270    let snippet: String = body.chars().take(200).collect();
1271    let embedding = crate::embedder::embed_passage_local(&paths.models, body)?;
1272    memories::upsert_vec(
1273        conn,
1274        memory_id,
1275        namespace,
1276        memory_type,
1277        &embedding,
1278        memory_name,
1279        &snippet,
1280    )?;
1281    Ok(())
1282}
1283
1284/// Persists an enriched memory body (body-enrich, GAP-18).
1285///
1286/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1287/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1288fn persist_enriched_body(
1289    conn: &Connection,
1290    namespace: &str,
1291    memory_id: i64,
1292    memory_name: &str,
1293    new_body: &str,
1294    paths: &crate::paths::AppPaths,
1295) -> Result<(), AppError> {
1296    // Read current values for FTS sync
1297    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1298        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1299        rusqlite::params![memory_id],
1300        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1301    )?;
1302
1303    let memory_type: String = conn.query_row(
1304        "SELECT type FROM memories WHERE id=?1",
1305        rusqlite::params![memory_id],
1306        |r| r.get(0),
1307    )?;
1308
1309    let description: String = conn.query_row(
1310        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1311        rusqlite::params![memory_id],
1312        |r| r.get(0),
1313    )?;
1314
1315    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1316
1317    let new_memory = memories::NewMemory {
1318        namespace: namespace.to_string(),
1319        name: memory_name.to_string(),
1320        memory_type: memory_type.clone(),
1321        description: description.clone(),
1322        body: new_body.to_string(),
1323        body_hash,
1324        session_id: None,
1325        source: "agent".to_string(),
1326        metadata: serde_json::json!({
1327            "operation": "body-enrich",
1328            "orig_chars": old_body.chars().count(),
1329            "new_chars": new_body.chars().count(),
1330        }),
1331    };
1332
1333    // G29 audit: insert a new immutable version BEFORE the update so the
1334    // enriched body is reachable through `history --name <X>` and
1335    // `restore --version N` can roll back to the pre-enrich state.
1336    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1337    let version_metadata = serde_json::json!({
1338        "operation": "body-enrich",
1339        "orig_chars": old_body.chars().count(),
1340        "new_chars": new_body.chars().count(),
1341    })
1342    .to_string();
1343    crate::storage::versions::insert_version(
1344        conn,
1345        memory_id,
1346        next_version,
1347        memory_name,
1348        &memory_type,
1349        &description,
1350        new_body,
1351        &version_metadata,
1352        Some("enrich"),
1353        "edit",
1354    )?;
1355
1356    memories::update(conn, memory_id, &new_memory, None)?;
1357    memories::sync_fts_after_update(
1358        conn,
1359        memory_id,
1360        &old_name,
1361        &old_desc,
1362        &old_body,
1363        &new_memory.name,
1364        &new_memory.description,
1365        &new_memory.body,
1366    )?;
1367
1368    // Re-embed for recall accuracy
1369    if let Err(e) = reembed_memory_vector(
1370        conn,
1371        namespace,
1372        memory_id,
1373        memory_name,
1374        &memory_type,
1375        new_body,
1376        paths,
1377    ) {
1378        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1379    }
1380
1381    Ok(())
1382}
1383
1384// ---------------------------------------------------------------------------
1385// Main entry point
1386// ---------------------------------------------------------------------------
1387
1388// ---------------------------------------------------------------------------
1389// G20: mode-conditional flag validation
1390// ---------------------------------------------------------------------------
1391
1392/// True when a scalar value matches its declared default. Used to
1393/// distinguish "operator passed an explicit override" from "clap filled
1394/// the default" for flags with default_value_t.
1395fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1396    value == default
1397}
1398
1399/// G20: validate that flags for one LLM provider were not passed when
1400/// the operator selected a different provider. Flags silently discarded
1401/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1402/// DB work, so the operator gets an actionable error instead of a
1403/// surprise at runtime.
1404///
1405/// Detection rules:
1406/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1407/// - For scalar fields with default_value_t: value != default means explicit
1408/// - For boolean fields: true means explicit (default is false)
1409///
1410/// Mode-specific matrices:
1411/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1412/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1413fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1414    const DEFAULT_TIMEOUT: u64 = 300;
1415
1416    let mut conflicts: Vec<String> = Vec::new();
1417
1418    match args.mode {
1419        EnrichMode::ClaudeCode => {
1420            if args.codex_binary.is_some() {
1421                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1422            }
1423            if args.codex_model.is_some() {
1424                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1425            }
1426            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1427                conflicts.push(format!(
1428                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1429                    args.codex_timeout
1430                ));
1431            }
1432        }
1433        EnrichMode::Codex => {
1434            if args.claude_binary.is_some() {
1435                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1436            }
1437            if args.claude_model.is_some() {
1438                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1439            }
1440            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1441                conflicts.push(format!(
1442                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1443                    args.claude_timeout
1444                ));
1445            }
1446            if args.max_cost_usd.is_some() {
1447                conflicts.push(
1448                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1449                        .to_string(),
1450                );
1451            }
1452        }
1453    }
1454
1455    if !conflicts.is_empty() {
1456        return Err(AppError::Validation(format!(
1457            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1458            args.mode,
1459            conflicts.join("\n  - ")
1460        )));
1461    }
1462
1463    Ok(())
1464}
1465
1466// ---------------------------------------------------------------------------
1467
1468/// Main entry point for the `enrich` command.
1469pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
1470    // G20: mode-conditional flag validation BEFORE any DB access.
1471    // Surfaces flags that the wrong mode would silently discard.
1472    validate_mode_conditional_flags_enrich(args)?;
1473    let started = Instant::now();
1474
1475    let paths = AppPaths::resolve(args.db.as_deref())?;
1476    ensure_db_ready(&paths)?;
1477    let conn = open_rw(&paths.db)?;
1478    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1479
1480    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1481    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1482    // on the same DB cannot co-exist, but concurrent enrich on different
1483    // databases works as expected. The force flag (--force) breaks a
1484    // stale lock from a previously crashed invocation.
1485    let wait_secs = args.wait_job_singleton;
1486    let force_flag = args.force_job_singleton;
1487    let _singleton = crate::lock::acquire_job_singleton(
1488        crate::lock::JobType::Enrich,
1489        &namespace,
1490        &paths.db,
1491        wait_secs,
1492        force_flag,
1493    )?;
1494
1495    // Validate provider binary upfront only for LLM-backed operations.
1496    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1497        None
1498    } else {
1499        Some(match args.mode {
1500            EnrichMode::ClaudeCode => {
1501                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1502                let version = super::claude_runner::validate_claude_version(&bin)?;
1503                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1504                emit_json(&PhaseEvent {
1505                    phase: "validate",
1506                    binary_path: bin.to_str(),
1507                    version: Some(&version),
1508                    items_total: None,
1509                    items_pending: None,
1510                    llm_parallelism: None,
1511                });
1512                bin
1513            }
1514            EnrichMode::Codex => {
1515                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1516                emit_json(&PhaseEvent {
1517                    phase: "validate",
1518                    binary_path: bin.to_str(),
1519                    version: None,
1520                    items_total: None,
1521                    items_pending: None,
1522                    llm_parallelism: None,
1523                });
1524                bin
1525            }
1526        })
1527    };
1528
1529    // G28-D: refuse to start when the system is saturated. This check
1530    // is BEFORE preflight so we never spend an OAuth turn on a host
1531    // that is already at the limit.
1532    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1533        let load = crate::system_load::load_average_one();
1534        let n = crate::system_load::ncpus();
1535        return Err(AppError::Validation(format!(
1536            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1537             pass --no-max-load-check to override (not recommended)"
1538        )));
1539    }
1540
1541    // G35: preflight probe — issue a single ping turn to verify the
1542    // provider is healthy before scanning N candidates. If the probe
1543    // fails with a rate-limit error, optionally fall back to a
1544    // different mode (typically codex) instead of failing the entire
1545    // batch. The probe itself consumes 1 OAuth turn, so it stays
1546    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1547    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1548    {
1549        let preflight_result = run_preflight_probe(args);
1550        match preflight_result {
1551            PreflightOutcome::Healthy => {
1552                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1553            }
1554            PreflightOutcome::RateLimited { reason, suggestion } => {
1555                if let Some(fallback) = args.fallback_mode.clone() {
1556                    if fallback != args.mode {
1557                        // G35 (v1.0.69): the mid-batch mode switch is
1558                        // intentionally NOT applied because it would
1559                        // desynchronise the per-item rate-limit wait
1560                        // state (rate-limited items in the worker are
1561                        // timed against the original provider). Instead
1562                        // we abort cleanly so the operator can re-invoke
1563                        // with `--mode {fallback:?}`. This guarantees no
1564                        // OAuth window is wasted and no partial state
1565                        // is left in the queue.
1566                        return Err(AppError::Validation(format!(
1567                            "preflight detected rate limit on {mode:?}: {reason}; \
1568                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1569                            mode = args.mode
1570                        )));
1571                    }
1572                    return Err(AppError::Validation(format!(
1573                        "preflight detected rate limit on {mode:?}: {reason}; \
1574                         --fallback-mode matches --mode, no recovery possible",
1575                        mode = args.mode
1576                    )));
1577                }
1578                return Err(AppError::Validation(format!(
1579                    "preflight detected rate limit on {mode:?}: {reason}; \
1580                     {suggestion}; pass --fallback-mode codex to recover",
1581                    mode = args.mode
1582                )));
1583            }
1584            PreflightOutcome::Error(e) => {
1585                return Err(e);
1586            }
1587        }
1588    }
1589
1590    // SCAN phase
1591    let scan_result = scan_operation(&conn, &namespace, args)?;
1592    let total = scan_result.len();
1593
1594    emit_json(&PhaseEvent {
1595        phase: "scan",
1596        binary_path: None,
1597        version: None,
1598        items_total: Some(total),
1599        items_pending: Some(total),
1600        llm_parallelism: Some(args.llm_parallelism),
1601    });
1602
1603    // Dry-run: emit preview events and summary without calling LLM
1604    if args.dry_run {
1605        for (idx, key) in scan_result.iter().enumerate() {
1606            emit_json(&ItemEvent {
1607                item: key,
1608                status: "preview",
1609                memory_id: None,
1610                entity_id: None,
1611                entities: None,
1612                rels: None,
1613                chars_before: None,
1614                chars_after: None,
1615                cost_usd: None,
1616                elapsed_ms: None,
1617                error: None,
1618                index: idx,
1619                total,
1620            });
1621        }
1622        emit_json(&EnrichSummary {
1623            summary: true,
1624            operation: format!("{:?}", args.operation),
1625            items_total: total,
1626            completed: 0,
1627            failed: 0,
1628            skipped: 0,
1629            cost_usd: 0.0,
1630            elapsed_ms: started.elapsed().as_millis() as u64,
1631        });
1632        return Ok(());
1633    }
1634
1635    // All operations in this enum have an execution path.
1636
1637    // Queue setup for resume/retry
1638    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1639
1640    if args.resume {
1641        let reset = queue_conn
1642            .execute(
1643                "UPDATE queue SET status='pending' WHERE status='processing'",
1644                [],
1645            )
1646            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1647        if reset > 0 {
1648            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1649        }
1650    }
1651
1652    if args.retry_failed {
1653        let count = queue_conn
1654            .execute(
1655                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1656                [],
1657            )
1658            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1659        tracing::info!(target: "enrich", count, "retrying failed items");
1660    }
1661
1662    if !args.resume && !args.retry_failed {
1663        queue_conn
1664            .execute("DELETE FROM queue", [])
1665            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1666    }
1667
1668    // Populate queue
1669    for (idx, key) in scan_result.iter().enumerate() {
1670        let item_type = match args.operation {
1671            EnrichOperation::EntityDescriptions => "entity",
1672            _ => "memory",
1673        };
1674        if let Err(e) = queue_conn.execute(
1675            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1676            rusqlite::params![key, item_type],
1677        ) {
1678            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1679        }
1680        let _ = idx; // suppress unused warning
1681    }
1682
1683    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1684    // Clamp enforces the range even if the caller bypasses clap validation.
1685    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1686    if parallelism > 1 {
1687        tracing::info!(
1688            target: "enrich",
1689            llm_parallelism = parallelism,
1690            "parallel LLM processing with bounded thread pool"
1691        );
1692    }
1693    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1694    // ceiling. The threshold and message depend on the LLM mode because
1695    // Claude Code spawns MCP children (G28-A) while Codex does not.
1696    if parallelism > 4 {
1697        match args.mode {
1698            EnrichMode::ClaudeCode => {
1699                tracing::warn!(
1700                    target: "enrich",
1701                    llm_parallelism = parallelism,
1702                    recommended_max = 4,
1703                    mode = "claude-code",
1704                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1705                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1706                     to cut MCP children (G28-A)"
1707                );
1708            }
1709            EnrichMode::Codex if parallelism > 16 => {
1710                tracing::warn!(
1711                    target: "enrich",
1712                    llm_parallelism = parallelism,
1713                    recommended_max = 16,
1714                    mode = "codex",
1715                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1716                     consider --llm-parallelism 8 for safer concurrency"
1717                );
1718            }
1719            EnrichMode::Codex => {
1720                // No warning: codex does not spawn MCP children and was
1721                // validated at parallelism 8 in production (1161 items,
1722                // 0 failures) per the 2026-06-04 session audit.
1723            }
1724        }
1725    }
1726
1727    let mut completed = 0usize;
1728    let mut failed = 0usize;
1729    let mut skipped = 0usize;
1730    let mut cost_total = 0.0f64;
1731    let mut oauth_detected = false;
1732    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1733    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1734    let enrich_started = std::time::Instant::now();
1735
1736    let provider_timeout = match args.mode {
1737        EnrichMode::ClaudeCode => args.claude_timeout,
1738        EnrichMode::Codex => args.codex_timeout,
1739    };
1740
1741    let provider_model: Option<&str> = match args.mode {
1742        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1743        EnrichMode::Codex => args.codex_model.as_deref(),
1744    };
1745
1746    // G19: when parallelism > 1, spawn bounded worker threads.
1747    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1748    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1749    if parallelism > 1 {
1750        let stdout_mu = parking_lot::Mutex::new(());
1751        let budget = args.max_cost_usd;
1752        let operation = args.operation.clone();
1753        let mode = args.mode.clone();
1754        let min_oc = args.min_output_chars;
1755        let max_oc = args.max_output_chars;
1756        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1757
1758        struct WorkerResult {
1759            completed: usize,
1760            failed: usize,
1761            skipped: usize,
1762            cost: f64,
1763            oauth: bool,
1764        }
1765
1766        let results: Vec<WorkerResult> = std::thread::scope(|s| {
1767            let handles: Vec<_> = (0..parallelism)
1768                .map(|worker_id| {
1769                    let stdout_mu = &stdout_mu;
1770                    let paths = &paths;
1771                    let namespace = &namespace;
1772                    let provider_binary = provider_binary.as_deref();
1773                    let operation = &operation;
1774                    let mode = &mode;
1775                    let prompt_tpl = prompt_tpl.as_deref();
1776                    s.spawn(move || {
1777                        let w_conn = match open_rw(&paths.db) {
1778                            Ok(c) => c,
1779                            Err(e) => {
1780                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1781                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1782                            }
1783                        };
1784                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1785                            Ok(c) => c,
1786                            Err(e) => {
1787                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1788                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1789                            }
1790                        };
1791                        let mut w_completed = 0usize;
1792                        let mut w_failed = 0usize;
1793                        let mut w_skipped = 0usize;
1794                        let mut w_cost = 0.0f64;
1795                        let mut w_oauth = false;
1796                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1797                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1798                        // G28-D: per-worker circuit breaker that aborts the
1799                        // loop after `circuit_breaker_threshold` consecutive
1800                        // HardFailure outcomes (transient/rate-limited errors
1801                        // do NOT count, so a recovering provider is not
1802                        // penalised).
1803                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1804                            args.circuit_breaker_threshold.max(1),
1805                            std::time::Duration::from_secs(60),
1806                        );
1807
1808                        loop {
1809                            if crate::shutdown_requested() {
1810                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1811                                break;
1812                            }
1813                            if let Some(b) = budget {
1814                                if !w_oauth && w_cost >= b {
1815                                    break;
1816                                }
1817                            }
1818                            let pending: Option<(i64, String, String)> = w_queue
1819                                .query_row(
1820                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1821                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1822                                     RETURNING id, item_key, item_type",
1823                                    [],
1824                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1825                                )
1826                                .ok();
1827                            let (queue_id, item_key, _item_type) = match pending {
1828                                Some(p) => p,
1829                                None => break,
1830                            };
1831                            let item_started = Instant::now();
1832                            let current_index = w_completed + w_failed + w_skipped;
1833
1834                            let call_result = match operation {
1835                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1836                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1837                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths),
1838                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths),
1839                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1840                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1841                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1842                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1843                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1844                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1845                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1846                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1847                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1848                            };
1849
1850                            match call_result {
1851                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1852                                    if is_oauth { w_oauth = true; }
1853                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1854                                    let _ = w_queue.execute(
1855                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1856                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1857                                    );
1858                                    w_completed += 1;
1859                                    if !is_oauth { w_cost += cost; }
1860                                    // G28-D: count success; resets breaker.
1861                                    let _ = w_breaker
1862                                        .record(crate::retry::AttemptOutcome::Success);
1863                                    let _guard = stdout_mu.lock();
1864                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1865                                }
1866                                Ok(EnrichItemResult::Skipped { reason }) => {
1867                                    w_skipped += 1;
1868                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1869                                    let _guard = stdout_mu.lock();
1870                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1871                                }
1872                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1873                                    // G29 Passo 4: worker mirror of the
1874                                    // serial path. Counted as a soft
1875                                    // skip so the queue surface shows
1876                                    // a quality issue rather than a
1877                                    // transport failure.
1878                                    w_skipped += 1;
1879                                    let reason = format!(
1880                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1881                                    );
1882                                    let _ = w_queue.execute(
1883                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1884                                        rusqlite::params![reason, queue_id],
1885                                    );
1886                                    let _guard = stdout_mu.lock();
1887                                    emit_json(&ItemEvent {
1888                                        item: &item_key,
1889                                        status: "preservation_failed",
1890                                        memory_id: None,
1891                                        entity_id: None,
1892                                        entities: None,
1893                                        rels: None,
1894                                        chars_before: Some(chars_before),
1895                                        chars_after: Some(chars_after),
1896                                        cost_usd: None,
1897                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1898                                        error: Some(reason),
1899                                        index: current_index,
1900                                        total,
1901                                    });
1902                                }
1903                                Err(e) => {
1904                                    let err_str = format!("{e}");
1905                                    if matches!(e, AppError::RateLimited { .. }) {
1906                                        if crate::retry::is_kill_switch_active() {
1907                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1908                                        } else if std::time::Instant::now() >= w_deadline {
1909                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1910                                        } else {
1911                                            let half = w_backoff / 2;
1912                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1913                                            let actual_wait = half + jitter;
1914                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1915                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1916                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1917                                            w_backoff = (w_backoff * 2).min(900);
1918                                            continue;
1919                                        }
1920                                    }
1921                                    w_failed += 1;
1922                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1923                                    let _guard = stdout_mu.lock();
1924                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1925                                    // G28-D: count hard failure against breaker.
1926                                    let breaker_opened = w_breaker
1927                                        .record(crate::retry::AttemptOutcome::HardFailure);
1928                                    if breaker_opened {
1929                                        tracing::error!(target: "enrich",
1930                                            consecutive_failures = w_breaker.consecutive_failures(),
1931                                            "circuit breaker opened — aborting worker"
1932                                        );
1933                                        break;
1934                                    }
1935                                }
1936                            }
1937                        }
1938                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1939                    })
1940                })
1941                .collect();
1942            handles
1943                .into_iter()
1944                .map(|h| {
1945                    h.join().unwrap_or(WorkerResult {
1946                        completed: 0,
1947                        failed: 0,
1948                        skipped: 0,
1949                        cost: 0.0,
1950                        oauth: false,
1951                    })
1952                })
1953                .collect()
1954        });
1955
1956        for r in &results {
1957            completed += r.completed;
1958            failed += r.failed;
1959            skipped += r.skipped;
1960            cost_total += r.cost;
1961            oauth_detected |= r.oauth;
1962        }
1963    } else {
1964        // Serial path (parallelism == 1) — original loop
1965        loop {
1966            if crate::shutdown_requested() {
1967                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1968                break;
1969            }
1970
1971            // Budget check
1972            if let Some(budget) = args.max_cost_usd {
1973                if !oauth_detected && cost_total >= budget {
1974                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1975                    break;
1976                }
1977            }
1978
1979            // Dequeue next pending item
1980            let pending: Option<(i64, String, String)> = queue_conn
1981                .query_row(
1982                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1983                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1984                 RETURNING id, item_key, item_type",
1985                    [],
1986                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1987                )
1988                .ok();
1989
1990            let (queue_id, item_key, item_type) = match pending {
1991                Some(p) => p,
1992                None => break,
1993            };
1994
1995            let item_started = Instant::now();
1996            let current_index = completed + failed + skipped;
1997
1998            let call_result = match args.operation {
1999                EnrichOperation::MemoryBindings => call_memory_bindings(
2000                    &conn,
2001                    &namespace,
2002                    &item_key,
2003                    provider_binary
2004                        .as_deref()
2005                        .expect("provider binary required"),
2006                    provider_model,
2007                    provider_timeout,
2008                    &args.mode,
2009                ),
2010                EnrichOperation::EntityDescriptions => call_entity_description(
2011                    &conn,
2012                    &namespace,
2013                    &item_key,
2014                    provider_binary
2015                        .as_deref()
2016                        .expect("provider binary required"),
2017                    provider_model,
2018                    provider_timeout,
2019                    &args.mode,
2020                ),
2021                EnrichOperation::BodyEnrich => call_body_enrich(
2022                    &conn,
2023                    &namespace,
2024                    &item_key,
2025                    provider_binary
2026                        .as_deref()
2027                        .expect("provider binary required"),
2028                    provider_model,
2029                    provider_timeout,
2030                    &args.mode,
2031                    args.min_output_chars,
2032                    args.max_output_chars,
2033                    args.prompt_template.as_deref(),
2034                    args.preserve_threshold,
2035                    &paths,
2036                ),
2037                EnrichOperation::ReEmbed => call_reembed(&conn, &namespace, &item_key, &paths),
2038                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2039                    &conn,
2040                    &namespace,
2041                    &item_key,
2042                    provider_binary
2043                        .as_deref()
2044                        .expect("provider binary required"),
2045                    provider_model,
2046                    provider_timeout,
2047                    &args.mode,
2048                ),
2049                EnrichOperation::RelationReclassify => call_relation_reclassify(
2050                    &conn,
2051                    &namespace,
2052                    &item_key,
2053                    provider_binary
2054                        .as_deref()
2055                        .expect("provider binary required"),
2056                    provider_model,
2057                    provider_timeout,
2058                    &args.mode,
2059                ),
2060                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2061                    call_entity_connect(
2062                        &conn,
2063                        &namespace,
2064                        &item_key,
2065                        provider_binary
2066                            .as_deref()
2067                            .expect("provider binary required"),
2068                        provider_model,
2069                        provider_timeout,
2070                        &args.mode,
2071                    )
2072                }
2073                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2074                    &conn,
2075                    &namespace,
2076                    &item_key,
2077                    provider_binary
2078                        .as_deref()
2079                        .expect("provider binary required"),
2080                    provider_model,
2081                    provider_timeout,
2082                    &args.mode,
2083                ),
2084                EnrichOperation::DescriptionEnrich => call_description_enrich(
2085                    &conn,
2086                    &namespace,
2087                    &item_key,
2088                    provider_binary
2089                        .as_deref()
2090                        .expect("provider binary required"),
2091                    provider_model,
2092                    provider_timeout,
2093                    &args.mode,
2094                ),
2095                EnrichOperation::DomainClassify => call_domain_classify(
2096                    &conn,
2097                    &namespace,
2098                    &item_key,
2099                    provider_binary
2100                        .as_deref()
2101                        .expect("provider binary required"),
2102                    provider_model,
2103                    provider_timeout,
2104                    &args.mode,
2105                ),
2106                EnrichOperation::GraphAudit => call_graph_audit(
2107                    &conn,
2108                    &namespace,
2109                    &item_key,
2110                    provider_binary
2111                        .as_deref()
2112                        .expect("provider binary required"),
2113                    provider_model,
2114                    provider_timeout,
2115                    &args.mode,
2116                ),
2117                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2118                    &conn,
2119                    &namespace,
2120                    &item_key,
2121                    provider_binary
2122                        .as_deref()
2123                        .expect("provider binary required"),
2124                    provider_model,
2125                    provider_timeout,
2126                    &args.mode,
2127                ),
2128                EnrichOperation::BodyExtract => call_body_extract(
2129                    &conn,
2130                    &namespace,
2131                    &item_key,
2132                    provider_binary
2133                        .as_deref()
2134                        .expect("provider binary required"),
2135                    provider_model,
2136                    provider_timeout,
2137                    &args.mode,
2138                ),
2139            };
2140
2141            match call_result {
2142                Ok(EnrichItemResult::Done {
2143                    memory_id,
2144                    entity_id,
2145                    entities,
2146                    rels,
2147                    chars_before,
2148                    chars_after,
2149                    cost,
2150                    is_oauth,
2151                }) => {
2152                    if is_oauth && !oauth_detected {
2153                        oauth_detected = true;
2154                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2155                    }
2156                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2157
2158                    // Persist depends on the operation
2159                    let persist_err: Option<String> = match args.operation {
2160                        EnrichOperation::MemoryBindings => {
2161                            // Bindings already persisted inside call_memory_bindings
2162                            None
2163                        }
2164                        EnrichOperation::EntityDescriptions => {
2165                            // Description already persisted inside call_entity_description
2166                            None
2167                        }
2168                        EnrichOperation::BodyEnrich => {
2169                            // Body already persisted inside call_body_enrich
2170                            None
2171                        }
2172                        _ => {
2173                            // All G27 operations persist inside their call_* function
2174                            None
2175                        }
2176                    };
2177
2178                    if let Err(e) = queue_conn.execute(
2179                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2180                    rusqlite::params![
2181                        memory_id,
2182                        entity_id,
2183                        entities as i64,
2184                        rels as i64,
2185                        cost,
2186                        item_started.elapsed().as_millis() as i64,
2187                        queue_id
2188                    ],
2189                ) {
2190                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2191                    }
2192
2193                    if persist_err.is_none() {
2194                        completed += 1;
2195                        if !is_oauth {
2196                            cost_total += cost;
2197                        }
2198                        emit_json(&ItemEvent {
2199                            item: &item_key,
2200                            status: "done",
2201                            memory_id,
2202                            entity_id,
2203                            entities: Some(entities),
2204                            rels: Some(rels),
2205                            chars_before,
2206                            chars_after,
2207                            cost_usd: if is_oauth { None } else { Some(cost) },
2208                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2209                            error: None,
2210                            index: current_index,
2211                            total,
2212                        });
2213                    } else {
2214                        failed += 1;
2215                        emit_json(&ItemEvent {
2216                            item: &item_key,
2217                            status: "failed",
2218                            memory_id: None,
2219                            entity_id: None,
2220                            entities: None,
2221                            rels: None,
2222                            chars_before: None,
2223                            chars_after: None,
2224                            cost_usd: None,
2225                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2226                            error: persist_err,
2227                            index: current_index,
2228                            total,
2229                        });
2230                    }
2231                }
2232                Ok(EnrichItemResult::Skipped { reason }) => {
2233                    skipped += 1;
2234                    if let Err(e) = queue_conn.execute(
2235                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2236                    rusqlite::params![reason, queue_id],
2237                ) {
2238                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2239                    }
2240                    emit_json(&ItemEvent {
2241                        item: &item_key,
2242                        status: "skipped",
2243                        memory_id: None,
2244                        entity_id: None,
2245                        entities: None,
2246                        rels: None,
2247                        chars_before: None,
2248                        chars_after: None,
2249                        cost_usd: None,
2250                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2251                        error: None,
2252                        index: current_index,
2253                        total,
2254                    });
2255                }
2256                Ok(EnrichItemResult::PreservationFailed {
2257                    score,
2258                    threshold,
2259                    chars_before,
2260                    chars_after,
2261                }) => {
2262                    // G29 Passo 4: the LLM rewrite diverged too far from
2263                    // the original body. Count as a soft failure (not
2264                    // `failed`) so the queue surfaces it as a quality
2265                    // issue, not a transport error. The reason is
2266                    // structured so the operator can audit why a body
2267                    // was rejected.
2268                    skipped += 1;
2269                    let reason = format!(
2270                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2271                    );
2272                    if let Err(qe) = queue_conn.execute(
2273                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2274                        rusqlite::params![reason, queue_id],
2275                    ) {
2276                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2277                    }
2278                    emit_json(&ItemEvent {
2279                        item: &item_key,
2280                        status: "preservation_failed",
2281                        memory_id: None,
2282                        entity_id: None,
2283                        entities: None,
2284                        rels: None,
2285                        chars_before: Some(chars_before),
2286                        chars_after: Some(chars_after),
2287                        cost_usd: None,
2288                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2289                        error: Some(reason),
2290                        index: current_index,
2291                        total,
2292                    });
2293                }
2294                Err(e) => {
2295                    let err_str = format!("{e}");
2296                    if matches!(e, AppError::RateLimited { .. }) {
2297                        if crate::retry::is_kill_switch_active() {
2298                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2299                        } else if std::time::Instant::now() >= rate_limit_deadline {
2300                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2301                        } else {
2302                            let half = backoff_secs / 2;
2303                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2304                            let actual_wait = half + jitter;
2305                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2306                            if let Err(qe) = queue_conn.execute(
2307                                "UPDATE queue SET status='pending' WHERE id=?1",
2308                                rusqlite::params![queue_id],
2309                            ) {
2310                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2311                            }
2312                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2313                            backoff_secs = (backoff_secs * 2).min(900);
2314                            continue;
2315                        }
2316                    }
2317
2318                    failed += 1;
2319                    if let Err(qe) = queue_conn.execute(
2320                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2321                    rusqlite::params![err_str, queue_id],
2322                ) {
2323                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2324                    }
2325                    emit_json(&ItemEvent {
2326                        item: &item_key,
2327                        status: "failed",
2328                        memory_id: None,
2329                        entity_id: None,
2330                        entities: None,
2331                        rels: None,
2332                        chars_before: None,
2333                        chars_after: None,
2334                        cost_usd: None,
2335                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2336                        error: Some(err_str),
2337                        index: current_index,
2338                        total,
2339                    });
2340                }
2341            }
2342
2343            let _ = item_type; // used via queue schema only
2344        }
2345    } // end else (serial path)
2346
2347    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2348    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2349
2350    emit_json(&EnrichSummary {
2351        summary: true,
2352        operation: format!("{:?}", args.operation),
2353        items_total: total,
2354        completed,
2355        failed,
2356        skipped,
2357        cost_usd: cost_total,
2358        elapsed_ms: started.elapsed().as_millis() as u64,
2359    });
2360
2361    if failed == 0 {
2362        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2363    }
2364
2365    Ok(())
2366}
2367
2368// ---------------------------------------------------------------------------
2369// Internal result type for a single item call
2370// ---------------------------------------------------------------------------
2371
2372enum EnrichItemResult {
2373    Done {
2374        memory_id: Option<i64>,
2375        entity_id: Option<i64>,
2376        entities: usize,
2377        rels: usize,
2378        chars_before: Option<usize>,
2379        chars_after: Option<usize>,
2380        cost: f64,
2381        is_oauth: bool,
2382    },
2383    Skipped {
2384        reason: String,
2385    },
2386    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2387    /// body beyond the configured `--preserve-threshold` and was rejected
2388    /// before persistence. The trigram-Jaccard score and threshold are
2389    /// emitted in the NDJSON stream for operator audit.
2390    PreservationFailed {
2391        score: f64,
2392        threshold: f64,
2393        chars_before: usize,
2394        chars_after: usize,
2395    },
2396}
2397
2398// ---------------------------------------------------------------------------
2399// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2400// ---------------------------------------------------------------------------
2401
2402fn call_memory_bindings(
2403    conn: &Connection,
2404    namespace: &str,
2405    memory_name: &str,
2406    binary: &Path,
2407    model: Option<&str>,
2408    timeout: u64,
2409    mode: &EnrichMode,
2410) -> Result<EnrichItemResult, AppError> {
2411    // Look up the memory
2412    let (memory_id, body): (i64, String) = conn.query_row(
2413        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2414        rusqlite::params![namespace, memory_name],
2415        |r| Ok((r.get(0)?, r.get(1)?)),
2416    ).map_err(|e| match e {
2417        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2418        other => AppError::Database(other),
2419    })?;
2420
2421    if body.trim().is_empty() {
2422        return Ok(EnrichItemResult::Skipped {
2423            reason: "body is empty".to_string(),
2424        });
2425    }
2426
2427    let (value, cost, is_oauth) = match mode {
2428        EnrichMode::ClaudeCode => call_claude(
2429            binary,
2430            BINDINGS_PROMPT,
2431            BINDINGS_SCHEMA,
2432            &body,
2433            model,
2434            timeout,
2435        )?,
2436        EnrichMode::Codex => call_codex(
2437            binary,
2438            BINDINGS_PROMPT,
2439            BINDINGS_SCHEMA,
2440            &body,
2441            model,
2442            timeout,
2443        )?,
2444    };
2445
2446    let empty_arr = serde_json::Value::Array(vec![]);
2447    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2448    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2449
2450    let (ent_count, rel_count) =
2451        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2452
2453    Ok(EnrichItemResult::Done {
2454        memory_id: Some(memory_id),
2455        entity_id: None,
2456        entities: ent_count,
2457        rels: rel_count,
2458        chars_before: None,
2459        chars_after: None,
2460        cost,
2461        is_oauth,
2462    })
2463}
2464
2465fn call_entity_description(
2466    conn: &Connection,
2467    namespace: &str,
2468    entity_name: &str,
2469    binary: &Path,
2470    model: Option<&str>,
2471    timeout: u64,
2472    mode: &EnrichMode,
2473) -> Result<EnrichItemResult, AppError> {
2474    let (entity_id, entity_type): (i64, String) = conn
2475        .query_row(
2476            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2477            rusqlite::params![namespace, entity_name],
2478            |r| Ok((r.get(0)?, r.get(1)?)),
2479        )
2480        .map_err(|e| match e {
2481            rusqlite::Error::QueryReturnedNoRows => {
2482                AppError::NotFound(format!("entity '{entity_name}' not found"))
2483            }
2484            other => AppError::Database(other),
2485        })?;
2486
2487    let prompt = format!(
2488        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2489    );
2490
2491    let (value, cost, is_oauth) = match mode {
2492        EnrichMode::ClaudeCode => call_claude(
2493            binary,
2494            &prompt,
2495            ENTITY_DESCRIPTION_SCHEMA,
2496            "",
2497            model,
2498            timeout,
2499        )?,
2500        EnrichMode::Codex => call_codex(
2501            binary,
2502            &prompt,
2503            ENTITY_DESCRIPTION_SCHEMA,
2504            "",
2505            model,
2506            timeout,
2507        )?,
2508    };
2509
2510    let description = value
2511        .get("description")
2512        .and_then(|v| v.as_str())
2513        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2514
2515    persist_entity_description(conn, entity_id, description)?;
2516
2517    Ok(EnrichItemResult::Done {
2518        memory_id: None,
2519        entity_id: Some(entity_id),
2520        entities: 0,
2521        rels: 0,
2522        chars_before: None,
2523        chars_after: None,
2524        cost,
2525        is_oauth,
2526    })
2527}
2528
2529#[allow(clippy::too_many_arguments)]
2530fn call_body_enrich(
2531    conn: &Connection,
2532    namespace: &str,
2533    memory_name: &str,
2534    binary: &Path,
2535    model: Option<&str>,
2536    timeout: u64,
2537    mode: &EnrichMode,
2538    min_output_chars: usize,
2539    max_output_chars: usize,
2540    prompt_template: Option<&Path>,
2541    preserve_threshold: f64,
2542    paths: &crate::paths::AppPaths,
2543) -> Result<EnrichItemResult, AppError> {
2544    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2545        .query_row(
2546            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2547         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2548            rusqlite::params![namespace, memory_name],
2549            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2550        )
2551        .map_err(|e| match e {
2552            rusqlite::Error::QueryReturnedNoRows => {
2553                AppError::NotFound(format!("memory '{memory_name}' not found"))
2554            }
2555            other => AppError::Database(other),
2556        })?;
2557
2558    let chars_before = body.chars().count();
2559
2560    // G26: gather graph context for contextualized enrichment
2561    let linked_entities: Vec<String> = {
2562        let mut stmt = conn.prepare_cached(
2563            "SELECT e.name FROM memory_entities me \
2564             JOIN entities e ON e.id = me.entity_id \
2565             WHERE me.memory_id = ?1 LIMIT 10",
2566        )?;
2567        let result: Vec<String> = stmt
2568            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2569            .filter_map(|r| r.ok())
2570            .collect();
2571        drop(stmt);
2572        result
2573    };
2574
2575    // Load custom prompt template if provided
2576    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2577        let file_size = std::fs::metadata(tmpl_path)
2578            .map_err(|e| {
2579                AppError::Io(std::io::Error::new(
2580                    e.kind(),
2581                    format!("failed to stat prompt template: {e}"),
2582                ))
2583            })?
2584            .len();
2585        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2586            return Err(AppError::LimitExceeded(
2587                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2588            ));
2589        }
2590        std::fs::read_to_string(tmpl_path).map_err(|e| {
2591            AppError::Io(std::io::Error::new(
2592                e.kind(),
2593                format!("failed to read prompt template: {e}"),
2594            ))
2595        })?
2596    } else {
2597        BODY_ENRICH_PROMPT_PREFIX.to_string()
2598    };
2599
2600    // G26: build contextualized prompt with graph data
2601    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2602        let mut ctx = String::new();
2603        ctx.push_str(&format!(
2604            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2605        ));
2606        if !description.is_empty() {
2607            ctx.push_str(&format!("- Description: {description}\n"));
2608        }
2609        ctx.push_str(&format!("- Domain: {namespace}\n"));
2610        if !linked_entities.is_empty() {
2611            ctx.push_str(&format!(
2612                "- Linked entities: {}\n",
2613                linked_entities.join(", ")
2614            ));
2615        }
2616        ctx
2617    } else {
2618        String::new()
2619    };
2620
2621    let prompt = format!(
2622        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2623    );
2624
2625    // The body schema uses a free-form enriched_body field
2626    let (value, cost, is_oauth) = match mode {
2627        EnrichMode::ClaudeCode => {
2628            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2629        }
2630        EnrichMode::Codex => {
2631            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2632        }
2633    };
2634
2635    let enriched_body = value
2636        .get("enriched_body")
2637        .and_then(|v| v.as_str())
2638        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2639
2640    let chars_after = enriched_body.chars().count();
2641
2642    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2643    // a trigram-Jaccard similarity between the original body and the
2644    // LLM-rewritten body. When the score falls below
2645    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2646    // rewrite as a likely hallucination. The result is recorded in the
2647    // NDJSON stream so operators can audit what the LLM tried to do.
2648    let threshold = preserve_threshold;
2649    let verdict =
2650        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2651    if !verdict.is_accepted() {
2652        return Ok(EnrichItemResult::PreservationFailed {
2653            score: match verdict {
2654                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2655                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2656                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2657            },
2658            threshold,
2659            chars_before,
2660            chars_after,
2661        });
2662    }
2663
2664    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2665    // compare the hash of the original body against the hash of the enriched
2666    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2667    // body (rare but possible) — treat as `Skipped` so re-running the batch
2668    // is safe and the queue does not get re-persisted entries.
2669    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2670    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2671    if old_hash == new_hash {
2672        return Ok(EnrichItemResult::Skipped {
2673            reason: format!(
2674                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2675            ),
2676        });
2677    }
2678
2679    // Only persist if the enriched body is genuinely longer
2680    if chars_after <= chars_before {
2681        return Ok(EnrichItemResult::Skipped {
2682            reason: format!(
2683                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2684            ),
2685        });
2686    }
2687
2688    persist_enriched_body(
2689        conn,
2690        namespace,
2691        memory_id,
2692        memory_name,
2693        enriched_body,
2694        paths,
2695    )?;
2696
2697    Ok(EnrichItemResult::Done {
2698        memory_id: Some(memory_id),
2699        entity_id: None,
2700        entities: 0,
2701        rels: 0,
2702        chars_before: Some(chars_before),
2703        chars_after: Some(chars_after),
2704        cost,
2705        is_oauth,
2706    })
2707}
2708
2709fn call_reembed(
2710    conn: &Connection,
2711    namespace: &str,
2712    memory_name: &str,
2713    paths: &crate::paths::AppPaths,
2714) -> Result<EnrichItemResult, AppError> {
2715    let (memory_id, body, memory_type): (i64, String, String) = conn
2716        .query_row(
2717            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2718             FROM memories
2719             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2720            rusqlite::params![namespace, memory_name],
2721            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2722        )
2723        .map_err(|e| match e {
2724            rusqlite::Error::QueryReturnedNoRows => {
2725                AppError::NotFound(format!("memory '{memory_name}' not found"))
2726            }
2727            other => AppError::Database(other),
2728        })?;
2729
2730    if body.trim().is_empty() {
2731        return Ok(EnrichItemResult::Skipped {
2732            reason: "body is empty".to_string(),
2733        });
2734    }
2735
2736    reembed_memory_vector(
2737        conn,
2738        namespace,
2739        memory_id,
2740        memory_name,
2741        &memory_type,
2742        &body,
2743        paths,
2744    )?;
2745
2746    Ok(EnrichItemResult::Done {
2747        memory_id: Some(memory_id),
2748        entity_id: None,
2749        entities: 0,
2750        rels: 0,
2751        chars_before: Some(body.chars().count()),
2752        chars_after: Some(body.chars().count()),
2753        cost: 0.0,
2754        is_oauth: true,
2755    })
2756}
2757
2758// ---------------------------------------------------------------------------
2759// Scan dispatcher — maps operation to scan query result (item keys)
2760// ---------------------------------------------------------------------------
2761
2762fn scan_operation(
2763    conn: &Connection,
2764    namespace: &str,
2765    args: &EnrichArgs,
2766) -> Result<Vec<String>, AppError> {
2767    // G37: resolve --names + --names-file once and apply to every scan path.
2768    let name_filter = resolve_name_filter(args)?;
2769    match args.operation {
2770        EnrichOperation::MemoryBindings => {
2771            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2772            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2773        }
2774        EnrichOperation::EntityDescriptions => {
2775            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2776            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2777        }
2778        EnrichOperation::BodyEnrich => {
2779            let rows =
2780                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2781            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2782        }
2783        EnrichOperation::ReEmbed => {
2784            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2785            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2786        }
2787        EnrichOperation::WeightCalibrate => {
2788            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2789            Ok(rows
2790                .into_iter()
2791                .map(|(id, _, _, _, _)| id.to_string())
2792                .collect())
2793        }
2794        EnrichOperation::RelationReclassify => {
2795            let rows = scan_generic_relations(conn, namespace, args.limit)?;
2796            Ok(rows
2797                .into_iter()
2798                .map(|(id, _, _, _)| id.to_string())
2799                .collect())
2800        }
2801        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2802            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2803            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2804        }
2805        EnrichOperation::EntityTypeValidate => {
2806            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2807            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2808        }
2809        EnrichOperation::DescriptionEnrich => {
2810            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2811            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2812        }
2813        EnrichOperation::DomainClassify
2814        | EnrichOperation::GraphAudit
2815        | EnrichOperation::DeepResearchSynth
2816        | EnrichOperation::BodyExtract => {
2817            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2818            let sql = format!(
2819                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2820            );
2821            let mut stmt = conn.prepare(&sql)?;
2822            let names = stmt
2823                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2824                .collect::<Result<Vec<_>, _>>()?;
2825            Ok(names)
2826        }
2827    }
2828}
2829
2830// ---------------------------------------------------------------------------
2831// Codex stub provider
2832// ---------------------------------------------------------------------------
2833
2834/// Locates the Codex CLI binary.
2835fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2836    if let Some(p) = explicit {
2837        if p.exists() {
2838            return Ok(p.to_path_buf());
2839        }
2840        return Err(AppError::Validation(format!(
2841            "Codex binary not found at explicit path: {}",
2842            p.display()
2843        )));
2844    }
2845
2846    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2847        let p = PathBuf::from(&env_path);
2848        if p.exists() {
2849            return Ok(p);
2850        }
2851    }
2852
2853    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2854    if let Some(path_var) = std::env::var_os("PATH") {
2855        for dir in std::env::split_paths(&path_var) {
2856            let candidate = dir.join(name);
2857            if candidate.exists() {
2858                return Ok(crate::extract::llm_embedding::resolve_real_binary(
2859                    &candidate,
2860                ));
2861            }
2862        }
2863    }
2864
2865    Err(AppError::Validation(
2866        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2867    ))
2868}
2869
2870/// G27: Calibrate weight of a single relationship via LLM.
2871fn call_weight_calibrate(
2872    conn: &Connection,
2873    _namespace: &str,
2874    item_key: &str,
2875    binary: &Path,
2876    model: Option<&str>,
2877    timeout: u64,
2878    mode: &EnrichMode,
2879) -> Result<EnrichItemResult, AppError> {
2880    let rel_id: i64 = item_key
2881        .parse()
2882        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2883    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2884        .query_row(
2885            "SELECT e1.name, e2.name, r.relation, r.weight \
2886             FROM relationships r \
2887             JOIN entities e1 ON e1.id = r.source_id \
2888             JOIN entities e2 ON e2.id = r.target_id \
2889             WHERE r.id = ?1",
2890            rusqlite::params![rel_id],
2891            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2892        )
2893        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2894
2895    let input_text = format!(
2896        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2897    );
2898    let (value, cost, is_oauth) = match mode {
2899        EnrichMode::ClaudeCode => call_claude(
2900            binary,
2901            WEIGHT_CALIBRATE_PROMPT,
2902            WEIGHT_CALIBRATE_SCHEMA,
2903            &input_text,
2904            model,
2905            timeout,
2906        )?,
2907        EnrichMode::Codex => call_codex(
2908            binary,
2909            WEIGHT_CALIBRATE_PROMPT,
2910            WEIGHT_CALIBRATE_SCHEMA,
2911            &input_text,
2912            model,
2913            timeout,
2914        )?,
2915    };
2916
2917    let calibrated = value
2918        .get("calibrated_weight")
2919        .and_then(|v| v.as_f64())
2920        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2921
2922    conn.execute(
2923        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2924        rusqlite::params![calibrated, rel_id],
2925    )?;
2926
2927    Ok(EnrichItemResult::Done {
2928        memory_id: None,
2929        entity_id: None,
2930        entities: 0,
2931        rels: 1,
2932        chars_before: None,
2933        chars_after: None,
2934        cost,
2935        is_oauth,
2936    })
2937}
2938
2939/// G27: Reclassify a generic relationship type via LLM.
2940fn call_relation_reclassify(
2941    conn: &Connection,
2942    _namespace: &str,
2943    item_key: &str,
2944    binary: &Path,
2945    model: Option<&str>,
2946    timeout: u64,
2947    mode: &EnrichMode,
2948) -> Result<EnrichItemResult, AppError> {
2949    let rel_id: i64 = item_key
2950        .parse()
2951        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2952    let (source_name, target_name, current_relation): (String, String, String) = conn
2953        .query_row(
2954            "SELECT e1.name, e2.name, r.relation \
2955             FROM relationships r \
2956             JOIN entities e1 ON e1.id = r.source_id \
2957             JOIN entities e2 ON e2.id = r.target_id \
2958             WHERE r.id = ?1",
2959            rusqlite::params![rel_id],
2960            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2961        )
2962        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2963
2964    let input_text = format!(
2965        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2966    );
2967    let (value, cost, is_oauth) = match mode {
2968        EnrichMode::ClaudeCode => call_claude(
2969            binary,
2970            RELATION_RECLASSIFY_PROMPT,
2971            RELATION_RECLASSIFY_SCHEMA,
2972            &input_text,
2973            model,
2974            timeout,
2975        )?,
2976        EnrichMode::Codex => call_codex(
2977            binary,
2978            RELATION_RECLASSIFY_PROMPT,
2979            RELATION_RECLASSIFY_SCHEMA,
2980            &input_text,
2981            model,
2982            timeout,
2983        )?,
2984    };
2985
2986    let new_relation = value
2987        .get("relation")
2988        .and_then(|v| v.as_str())
2989        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2990    let new_strength = value
2991        .get("strength")
2992        .and_then(|v| v.as_f64())
2993        .unwrap_or(0.5);
2994
2995    conn.execute(
2996        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2997        rusqlite::params![new_relation, new_strength, rel_id],
2998    )?;
2999
3000    Ok(EnrichItemResult::Done {
3001        memory_id: None,
3002        entity_id: None,
3003        entities: 0,
3004        rels: 1,
3005        chars_before: None,
3006        chars_after: None,
3007        cost,
3008        is_oauth,
3009    })
3010}
3011
3012/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3013fn call_entity_connect(
3014    conn: &Connection,
3015    namespace: &str,
3016    item_key: &str,
3017    binary: &Path,
3018    model: Option<&str>,
3019    timeout: u64,
3020    mode: &EnrichMode,
3021) -> Result<EnrichItemResult, AppError> {
3022    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3023    let (e1_id, e1_name, e2_id, e2_name) =
3024        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3025            Some(p) => p,
3026            None => {
3027                return Ok(EnrichItemResult::Skipped {
3028                    reason: "pair no longer isolated".into(),
3029                })
3030            }
3031        };
3032    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3033    let (value, cost, is_oauth) = match mode {
3034        EnrichMode::ClaudeCode => call_claude(
3035            binary,
3036            ENTITY_CONNECT_PROMPT,
3037            ENTITY_CONNECT_SCHEMA,
3038            &input_text,
3039            model,
3040            timeout,
3041        )?,
3042        EnrichMode::Codex => call_codex(
3043            binary,
3044            ENTITY_CONNECT_PROMPT,
3045            ENTITY_CONNECT_SCHEMA,
3046            &input_text,
3047            model,
3048            timeout,
3049        )?,
3050    };
3051    let relation = value
3052        .get("relation")
3053        .and_then(|v| v.as_str())
3054        .unwrap_or("none");
3055    if relation == "none" {
3056        return Ok(EnrichItemResult::Skipped {
3057            reason: "LLM determined no relationship".into(),
3058        });
3059    }
3060    let strength = value
3061        .get("strength")
3062        .and_then(|v| v.as_f64())
3063        .unwrap_or(0.5);
3064    conn.execute(
3065        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3066        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3067    )?;
3068    Ok(EnrichItemResult::Done {
3069        memory_id: None,
3070        entity_id: None,
3071        entities: 0,
3072        rels: 1,
3073        chars_before: None,
3074        chars_after: None,
3075        cost,
3076        is_oauth,
3077    })
3078}
3079
3080/// G27 P2: Validate entity type assignment via LLM.
3081fn call_entity_type_validate(
3082    conn: &Connection,
3083    _namespace: &str,
3084    item_key: &str,
3085    binary: &Path,
3086    model: Option<&str>,
3087    timeout: u64,
3088    mode: &EnrichMode,
3089) -> Result<EnrichItemResult, AppError> {
3090    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3091        .query_row(
3092            "SELECT id, name, type FROM entities WHERE name = ?1",
3093            rusqlite::params![item_key],
3094            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3095        )
3096        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3097    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3098    let (value, cost, is_oauth) = match mode {
3099        EnrichMode::ClaudeCode => call_claude(
3100            binary,
3101            ENTITY_TYPE_VALIDATE_PROMPT,
3102            ENTITY_TYPE_VALIDATE_SCHEMA,
3103            &input_text,
3104            model,
3105            timeout,
3106        )?,
3107        EnrichMode::Codex => call_codex(
3108            binary,
3109            ENTITY_TYPE_VALIDATE_PROMPT,
3110            ENTITY_TYPE_VALIDATE_SCHEMA,
3111            &input_text,
3112            model,
3113            timeout,
3114        )?,
3115    };
3116    let validated_type = value
3117        .get("validated_type")
3118        .and_then(|v| v.as_str())
3119        .unwrap_or(&ent_type);
3120    let was_correct = value
3121        .get("was_correct")
3122        .and_then(|v| v.as_bool())
3123        .unwrap_or(true);
3124    if !was_correct {
3125        conn.execute(
3126            "UPDATE entities SET type = ?1 WHERE id = ?2",
3127            rusqlite::params![validated_type, ent_id],
3128        )?;
3129    }
3130    Ok(EnrichItemResult::Done {
3131        memory_id: None,
3132        entity_id: Some(ent_id),
3133        entities: 1,
3134        rels: 0,
3135        chars_before: None,
3136        chars_after: None,
3137        cost,
3138        is_oauth,
3139    })
3140}
3141
3142/// G27 P2: Enrich generic memory description via LLM.
3143fn call_description_enrich(
3144    conn: &Connection,
3145    _namespace: &str,
3146    item_key: &str,
3147    binary: &Path,
3148    model: Option<&str>,
3149    timeout: u64,
3150    mode: &EnrichMode,
3151) -> Result<EnrichItemResult, AppError> {
3152    let (mem_id, body, old_desc): (i64, String, String) = conn
3153        .query_row(
3154            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3155            rusqlite::params![item_key],
3156            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3157        )
3158        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3159    let snippet: String = body.chars().take(500).collect();
3160    let input_text = format!(
3161        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3162    );
3163    let (value, cost, is_oauth) = match mode {
3164        EnrichMode::ClaudeCode => call_claude(
3165            binary,
3166            DESCRIPTION_ENRICH_PROMPT,
3167            DESCRIPTION_ENRICH_SCHEMA,
3168            &input_text,
3169            model,
3170            timeout,
3171        )?,
3172        EnrichMode::Codex => call_codex(
3173            binary,
3174            DESCRIPTION_ENRICH_PROMPT,
3175            DESCRIPTION_ENRICH_SCHEMA,
3176            &input_text,
3177            model,
3178            timeout,
3179        )?,
3180    };
3181    let new_desc = value
3182        .get("description")
3183        .and_then(|v| v.as_str())
3184        .unwrap_or(&old_desc);
3185    conn.execute(
3186        "UPDATE memories SET description = ?1 WHERE id = ?2",
3187        rusqlite::params![new_desc, mem_id],
3188    )?;
3189    Ok(EnrichItemResult::Done {
3190        memory_id: Some(mem_id),
3191        entity_id: None,
3192        entities: 0,
3193        rels: 0,
3194        chars_before: Some(old_desc.len()),
3195        chars_after: Some(new_desc.len()),
3196        cost,
3197        is_oauth,
3198    })
3199}
3200
3201/// G27 P2: Classify memory into domain category via LLM.
3202fn call_domain_classify(
3203    conn: &Connection,
3204    _namespace: &str,
3205    item_key: &str,
3206    binary: &Path,
3207    model: Option<&str>,
3208    timeout: u64,
3209    mode: &EnrichMode,
3210) -> Result<EnrichItemResult, AppError> {
3211    let (mem_id, body, desc): (i64, String, String) = conn
3212        .query_row(
3213            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3214            rusqlite::params![item_key],
3215            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3216        )
3217        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3218    let snippet: String = body.chars().take(500).collect();
3219    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3220    let (value, cost, is_oauth) = match mode {
3221        EnrichMode::ClaudeCode => call_claude(
3222            binary,
3223            DOMAIN_CLASSIFY_PROMPT,
3224            DOMAIN_CLASSIFY_SCHEMA,
3225            &input_text,
3226            model,
3227            timeout,
3228        )?,
3229        EnrichMode::Codex => call_codex(
3230            binary,
3231            DOMAIN_CLASSIFY_PROMPT,
3232            DOMAIN_CLASSIFY_SCHEMA,
3233            &input_text,
3234            model,
3235            timeout,
3236        )?,
3237    };
3238    let domain = value
3239        .get("domain")
3240        .and_then(|v| v.as_str())
3241        .unwrap_or("uncategorized");
3242    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3243    conn.execute(
3244        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3245        rusqlite::params![metadata, mem_id],
3246    )?;
3247    Ok(EnrichItemResult::Done {
3248        memory_id: Some(mem_id),
3249        entity_id: None,
3250        entities: 0,
3251        rels: 0,
3252        chars_before: None,
3253        chars_after: None,
3254        cost,
3255        is_oauth,
3256    })
3257}
3258
3259/// G27 P2: Audit memory graph quality via LLM.
3260fn call_graph_audit(
3261    conn: &Connection,
3262    _namespace: &str,
3263    item_key: &str,
3264    binary: &Path,
3265    model: Option<&str>,
3266    timeout: u64,
3267    mode: &EnrichMode,
3268) -> Result<EnrichItemResult, AppError> {
3269    let (mem_id, body, desc): (i64, String, String) = conn
3270        .query_row(
3271            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3272            rusqlite::params![item_key],
3273            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3274        )
3275        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3276    let snippet: String = body.chars().take(500).collect();
3277    let ent_count: i64 = conn
3278        .query_row(
3279            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3280            rusqlite::params![mem_id],
3281            |r| r.get(0),
3282        )
3283        .unwrap_or(0);
3284    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3285    let (value, cost, is_oauth) = match mode {
3286        EnrichMode::ClaudeCode => call_claude(
3287            binary,
3288            GRAPH_AUDIT_PROMPT,
3289            GRAPH_AUDIT_SCHEMA,
3290            &input_text,
3291            model,
3292            timeout,
3293        )?,
3294        EnrichMode::Codex => call_codex(
3295            binary,
3296            GRAPH_AUDIT_PROMPT,
3297            GRAPH_AUDIT_SCHEMA,
3298            &input_text,
3299            model,
3300            timeout,
3301        )?,
3302    };
3303    let issues = value
3304        .get("issues")
3305        .and_then(|v| v.as_array())
3306        .map(|a| a.len())
3307        .unwrap_or(0);
3308    Ok(EnrichItemResult::Done {
3309        memory_id: Some(mem_id),
3310        entity_id: None,
3311        entities: 0,
3312        rels: issues,
3313        chars_before: None,
3314        chars_after: None,
3315        cost,
3316        is_oauth,
3317    })
3318}
3319
3320/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3321fn call_deep_research_synth(
3322    conn: &Connection,
3323    namespace: &str,
3324    item_key: &str,
3325    binary: &Path,
3326    model: Option<&str>,
3327    timeout: u64,
3328    mode: &EnrichMode,
3329) -> Result<EnrichItemResult, AppError> {
3330    let (mem_id, body): (i64, String) = conn
3331        .query_row(
3332            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3333            rusqlite::params![item_key],
3334            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3335        )
3336        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3337    let snippet: String = body.chars().take(2000).collect();
3338    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3339    let (value, cost, is_oauth) = match mode {
3340        EnrichMode::ClaudeCode => call_claude(
3341            binary,
3342            DEEP_RESEARCH_SYNTH_PROMPT,
3343            DEEP_RESEARCH_SYNTH_SCHEMA,
3344            &input_text,
3345            model,
3346            timeout,
3347        )?,
3348        EnrichMode::Codex => call_codex(
3349            binary,
3350            DEEP_RESEARCH_SYNTH_PROMPT,
3351            DEEP_RESEARCH_SYNTH_SCHEMA,
3352            &input_text,
3353            model,
3354            timeout,
3355        )?,
3356    };
3357    let mut ent_count = 0usize;
3358    let mut rel_count = 0usize;
3359    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3360        for e in ents {
3361            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3362            let etype_str = e
3363                .get("entity_type")
3364                .and_then(|v| v.as_str())
3365                .unwrap_or("concept");
3366            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3367            if name.len() >= 2 {
3368                let ne = NewEntity {
3369                    name: name.to_string(),
3370                    entity_type: etype,
3371                    description: None,
3372                };
3373                let _ = entities::upsert_entity(conn, namespace, &ne);
3374                ent_count += 1;
3375            }
3376        }
3377    }
3378    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3379        for r in rels {
3380            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3381            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3382            if src.is_empty() || tgt.is_empty() {
3383                continue;
3384            }
3385            let rel = r
3386                .get("relation")
3387                .and_then(|v| v.as_str())
3388                .unwrap_or("related");
3389            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3390            if let (Some(sid), Some(tid)) = (
3391                entities::find_entity_id(conn, namespace, src)?,
3392                entities::find_entity_id(conn, namespace, tgt)?,
3393            ) {
3394                let _ = entities::create_or_fetch_relationship(
3395                    conn, namespace, sid, tid, rel, str_, None,
3396                );
3397                rel_count += 1;
3398            }
3399        }
3400    }
3401    Ok(EnrichItemResult::Done {
3402        memory_id: Some(mem_id),
3403        entity_id: None,
3404        entities: ent_count,
3405        rels: rel_count,
3406        chars_before: None,
3407        chars_after: None,
3408        cost,
3409        is_oauth,
3410    })
3411}
3412
3413/// G27 P2: Extract structured body from unstructured text via LLM.
3414fn call_body_extract(
3415    conn: &Connection,
3416    _namespace: &str,
3417    item_key: &str,
3418    binary: &Path,
3419    model: Option<&str>,
3420    timeout: u64,
3421    mode: &EnrichMode,
3422) -> Result<EnrichItemResult, AppError> {
3423    let (mem_id, body): (i64, String) = conn
3424        .query_row(
3425            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3426            rusqlite::params![item_key],
3427            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3428        )
3429        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3430    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3431    let (value, cost, is_oauth) = match mode {
3432        EnrichMode::ClaudeCode => call_claude(
3433            binary,
3434            BODY_EXTRACT_PROMPT,
3435            BODY_EXTRACT_SCHEMA,
3436            &input_text,
3437            model,
3438            timeout,
3439        )?,
3440        EnrichMode::Codex => call_codex(
3441            binary,
3442            BODY_EXTRACT_PROMPT,
3443            BODY_EXTRACT_SCHEMA,
3444            &input_text,
3445            model,
3446            timeout,
3447        )?,
3448    };
3449    let restructured = value
3450        .get("restructured_body")
3451        .and_then(|v| v.as_str())
3452        .unwrap_or(&body);
3453    let chars_before = body.len();
3454    let chars_after = restructured.len();
3455    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3456    conn.execute(
3457        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3458        rusqlite::params![restructured, new_hash, mem_id],
3459    )?;
3460    Ok(EnrichItemResult::Done {
3461        memory_id: Some(mem_id),
3462        entity_id: None,
3463        entities: 0,
3464        rels: 0,
3465        chars_before: Some(chars_before),
3466        chars_after: Some(chars_after),
3467        cost,
3468        is_oauth,
3469    })
3470}
3471
3472/// Scan for pairs of entities that share no direct relationship.
3473#[allow(clippy::type_complexity)]
3474fn scan_isolated_entity_pairs(
3475    conn: &Connection,
3476    namespace: &str,
3477    limit: Option<usize>,
3478) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3479    let limit_val = limit.unwrap_or(50) as i64;
3480    let mut stmt = conn.prepare_cached(
3481        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3482         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3483         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3484           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3485           (r.source_id = e2.id AND r.target_id = e1.id)) \
3486         LIMIT ?2",
3487    )?;
3488    let rows = stmt
3489        .query_map(rusqlite::params![namespace, limit_val], |r| {
3490            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3491        })?
3492        .collect::<Result<Vec<_>, _>>()?;
3493    Ok(rows)
3494}
3495
3496/// Scan for entities with non-validated types (all entities for type audit).
3497fn scan_entities_for_type_validation(
3498    conn: &Connection,
3499    namespace: &str,
3500    limit: Option<usize>,
3501) -> Result<Vec<(i64, String, String)>, AppError> {
3502    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3503    let sql = format!(
3504        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3505    );
3506    let mut stmt = conn.prepare(&sql)?;
3507    let rows = stmt
3508        .query_map(rusqlite::params![namespace], |r| {
3509            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3510        })?
3511        .collect::<Result<Vec<_>, _>>()?;
3512    Ok(rows)
3513}
3514
3515/// Scan for memories with generic descriptions (ingested, imported, etc).
3516fn scan_generic_descriptions(
3517    conn: &Connection,
3518    namespace: &str,
3519    limit: Option<usize>,
3520) -> Result<Vec<(i64, String, String)>, AppError> {
3521    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3522    let sql = format!(
3523        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3524         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3525         ORDER BY id {limit_clause}"
3526    );
3527    let mut stmt = conn.prepare(&sql)?;
3528    let rows = stmt
3529        .query_map(rusqlite::params![namespace], |r| {
3530            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3531        })?
3532        .collect::<Result<Vec<_>, _>>()?;
3533    Ok(rows)
3534}
3535
3536/// Calls the Codex CLI for a single enrichment item.
3537///
3538/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3539fn call_codex(
3540    binary: &Path,
3541    prompt: &str,
3542    json_schema: &str,
3543    input_text: &str,
3544    model: Option<&str>,
3545    timeout_secs: u64,
3546) -> Result<(serde_json::Value, f64, bool), AppError> {
3547    use wait_timeout::ChildExt;
3548
3549    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3550    // schema to a trusted cache path (not /tmp), and reuse the
3551    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3552    // hardening rationale.
3553    super::codex_spawn::validate_codex_model(model)?;
3554    let schema_file = super::codex_spawn::trusted_schema_path()?;
3555
3556    let args = super::codex_spawn::CodexSpawnArgs {
3557        binary,
3558        prompt,
3559        json_schema,
3560        input_text,
3561        model,
3562        timeout_secs,
3563        schema_path: schema_file.clone(),
3564    };
3565    let mut cmd = super::codex_spawn::build_codex_command(&args);
3566
3567    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3568        AppError::Io(std::io::Error::new(
3569            e.kind(),
3570            format!("failed to spawn codex: {e}"),
3571        ))
3572    })?;
3573
3574    let full_prompt = format!("{prompt}\n\n{input_text}");
3575    let stdin_bytes = full_prompt.into_bytes();
3576    let mut child_stdin = child
3577        .stdin
3578        .take()
3579        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3580    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3581        child_stdin.write_all(&stdin_bytes)?;
3582        drop(child_stdin);
3583        Ok(())
3584    });
3585
3586    let start = std::time::Instant::now();
3587    let timeout = std::time::Duration::from_secs(timeout_secs);
3588    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3589    let _ = std::fs::remove_file(&schema_file);
3590
3591    match status {
3592        Some(exit_status) => {
3593            stdin_thread
3594                .join()
3595                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3596                .map_err(AppError::Io)?;
3597
3598            tracing::debug!(
3599                target: "process",
3600                exit_code = ?exit_status.code(),
3601                elapsed_ms = start.elapsed().as_millis() as u64,
3602                "external process completed"
3603            );
3604
3605            let mut stdout_buf = Vec::new();
3606            if let Some(mut out) = child.stdout.take() {
3607                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3608            }
3609            if !exit_status.success() {
3610                let mut stderr_buf = Vec::new();
3611                if let Some(mut err) = child.stderr.take() {
3612                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3613                }
3614                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3615                tracing::warn!(
3616                    target: "enrich",
3617                    exit_code = ?exit_status.code(),
3618                    stderr = %stderr_str.trim(),
3619                    "codex process failed"
3620                );
3621                return Err(AppError::Validation(format!(
3622                    "codex exited with code {:?}: {}",
3623                    exit_status.code(),
3624                    stderr_str.trim()
3625                )));
3626            }
3627            let stdout_str = String::from_utf8(stdout_buf)
3628                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3629            // G32: use the JSONL parser, NOT serde_json::from_str on the
3630            // entire stdout (codex emits one event per line).
3631            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3632            // Return the raw agent_message text parsed as JSON. Different
3633            // operations (memory-bindings, body-enrich) use different
3634            // output schemas, so we let the caller pick which fields to
3635            // extract. The previous implementation hardcoded
3636            // `{entities, urls}` which broke body-enrich.
3637            let value: serde_json::Value =
3638                serde_json::from_str(&result.last_agent_text).map_err(|e| {
3639                    AppError::Validation(format!(
3640                        "codex agent_message is not valid JSON: {e}; raw={}",
3641                        result.last_agent_text
3642                    ))
3643                })?;
3644            Ok((value, 0.0, false))
3645        }
3646        None => {
3647            let _ = child.kill();
3648            let _ = child.wait();
3649            let _ = stdin_thread.join();
3650            Err(AppError::Validation(format!(
3651                "codex timed out after {timeout_secs} seconds"
3652            )))
3653        }
3654    }
3655}
3656
3657// ---------------------------------------------------------------------------
3658// Tests
3659// ---------------------------------------------------------------------------
3660
3661#[cfg(test)]
3662mod tests {
3663    use super::*;
3664    use rusqlite::Connection;
3665    #[cfg(unix)]
3666    use std::os::unix::fs::PermissionsExt;
3667
3668    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
3669    fn open_test_db() -> Connection {
3670        let conn = Connection::open_in_memory().expect("in-memory db");
3671        conn.execute_batch(
3672            "CREATE TABLE memories (
3673                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3674                namespace   TEXT NOT NULL DEFAULT 'global',
3675                name        TEXT NOT NULL,
3676                type        TEXT NOT NULL DEFAULT 'note',
3677                description TEXT NOT NULL DEFAULT '',
3678                body        TEXT NOT NULL DEFAULT '',
3679                body_hash   TEXT NOT NULL DEFAULT '',
3680                session_id  TEXT,
3681                source      TEXT NOT NULL DEFAULT 'agent',
3682                metadata    TEXT NOT NULL DEFAULT '{}',
3683                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3684                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3685                deleted_at  INTEGER,
3686                UNIQUE(namespace, name)
3687            );
3688            CREATE TABLE entities (
3689                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3690                namespace   TEXT NOT NULL DEFAULT 'global',
3691                name        TEXT NOT NULL,
3692                type        TEXT NOT NULL DEFAULT 'concept',
3693                description TEXT,
3694                degree      INTEGER NOT NULL DEFAULT 0,
3695                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3696                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3697                UNIQUE(namespace, name)
3698            );
3699            CREATE TABLE memory_entities (
3700                memory_id  INTEGER NOT NULL,
3701                entity_id  INTEGER NOT NULL,
3702                PRIMARY KEY (memory_id, entity_id)
3703            );
3704            CREATE TABLE relationships (
3705                id         INTEGER PRIMARY KEY AUTOINCREMENT,
3706                namespace  TEXT NOT NULL DEFAULT 'global',
3707                source_id  INTEGER NOT NULL,
3708                target_id  INTEGER NOT NULL,
3709                relation   TEXT NOT NULL,
3710                weight     REAL NOT NULL DEFAULT 0.5,
3711                description TEXT,
3712                UNIQUE(source_id, target_id, relation)
3713            );
3714            CREATE TABLE memory_embeddings (
3715                memory_id   INTEGER PRIMARY KEY,
3716                namespace   TEXT NOT NULL,
3717                embedding   BLOB NOT NULL,
3718                source      TEXT NOT NULL,
3719                model       TEXT NOT NULL DEFAULT '',
3720                dim         INTEGER NOT NULL DEFAULT 384,
3721                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
3722            );",
3723        )
3724        .expect("schema creation must succeed");
3725        conn
3726    }
3727
3728    #[test]
3729    fn scan_unbound_memories_finds_memories_without_bindings() {
3730        let conn = open_test_db();
3731        conn.execute(
3732            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3733            [],
3734        )
3735        .unwrap();
3736
3737        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3738        assert_eq!(results.len(), 1);
3739        assert_eq!(results[0].1, "test-mem");
3740    }
3741
3742    #[test]
3743    fn scan_unbound_memories_excludes_bound_memories() {
3744        let conn = open_test_db();
3745        conn.execute(
3746            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3747            [],
3748        )
3749        .unwrap();
3750        let mem_id: i64 = conn
3751            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3752                r.get(0)
3753            })
3754            .unwrap();
3755        conn.execute(
3756            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3757            [],
3758        )
3759        .unwrap();
3760        let ent_id: i64 = conn
3761            .query_row(
3762                "SELECT id FROM entities WHERE name='some-entity'",
3763                [],
3764                |r| r.get(0),
3765            )
3766            .unwrap();
3767        conn.execute(
3768            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3769            rusqlite::params![mem_id, ent_id],
3770        )
3771        .unwrap();
3772
3773        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3774        assert!(results.is_empty(), "bound memory must not appear in scan");
3775    }
3776
3777    #[test]
3778    fn scan_entities_without_description_finds_null_description() {
3779        let conn = open_test_db();
3780        conn.execute(
3781            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3782            [],
3783        )
3784        .unwrap();
3785
3786        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3787        assert_eq!(results.len(), 1);
3788        assert_eq!(results[0].1, "my-tool");
3789    }
3790
3791    #[test]
3792    fn scan_entities_without_description_excludes_entities_with_description() {
3793        let conn = open_test_db();
3794        conn.execute(
3795            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3796            [],
3797        )
3798        .unwrap();
3799
3800        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3801        assert!(
3802            results.is_empty(),
3803            "entity with description must not appear"
3804        );
3805    }
3806
3807    #[test]
3808    fn scan_short_body_memories_finds_short_bodies() {
3809        let conn = open_test_db();
3810        conn.execute(
3811            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3812            [],
3813        )
3814        .unwrap();
3815
3816        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3817        assert_eq!(results.len(), 1);
3818        assert_eq!(results[0].1, "short-mem");
3819    }
3820
3821    #[test]
3822    fn scan_short_body_memories_excludes_long_bodies() {
3823        let conn = open_test_db();
3824        let long_body = "a".repeat(1000);
3825        conn.execute(
3826            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3827            rusqlite::params![long_body],
3828        )
3829        .unwrap();
3830
3831        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3832        assert!(results.is_empty(), "long memory must not appear in scan");
3833    }
3834
3835    #[test]
3836    fn scan_respects_limit() {
3837        let conn = open_test_db();
3838        for i in 0..5 {
3839            conn.execute(
3840                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3841                [],
3842            )
3843            .unwrap();
3844        }
3845
3846        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3847        assert_eq!(results.len(), 3, "limit must be respected");
3848    }
3849
3850    #[test]
3851    fn scan_memories_without_embeddings_finds_only_missing_rows() {
3852        let conn = open_test_db();
3853        conn.execute(
3854            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3855            [],
3856        )
3857        .unwrap();
3858        conn.execute(
3859            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3860            [],
3861        )
3862        .unwrap();
3863        let memory_id: i64 = conn
3864            .query_row(
3865                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3866                [],
3867                |r| r.get(0),
3868            )
3869            .unwrap();
3870        let embedding = vec![0.0_f32; crate::constants::EMBEDDING_DIM];
3871        memories::upsert_vec(
3872            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3873        )
3874        .unwrap();
3875
3876        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3877        assert_eq!(results.len(), 1);
3878        assert_eq!(results[0].1, "missing-vec");
3879    }
3880
3881    #[test]
3882    fn scan_memories_without_embeddings_respects_name_filter() {
3883        let conn = open_test_db();
3884        conn.execute(
3885            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3886            [],
3887        )
3888        .unwrap();
3889        conn.execute(
3890            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3891            [],
3892        )
3893        .unwrap();
3894
3895        let results =
3896            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3897                .unwrap();
3898        assert_eq!(results.len(), 1);
3899        assert_eq!(results[0].1, "match-me");
3900    }
3901
3902    #[test]
3903    fn queue_db_schema_creates_correctly() {
3904        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3905        let conn = open_queue_db(&tmp_path).expect("queue db must open");
3906        let count: i64 = conn
3907            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3908            .unwrap();
3909        assert_eq!(count, 0);
3910        let _ = std::fs::remove_file(&tmp_path);
3911    }
3912
3913    #[test]
3914    fn parse_claude_output_valid_bindings() {
3915        let output = r#"[
3916            {"type":"system","subtype":"init"},
3917            {"type":"result","is_error":false,"total_cost_usd":0.01,
3918             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3919        ]"#;
3920        let result = crate::commands::claude_runner::parse_claude_output(output)
3921            .expect("must parse successfully");
3922        assert!(result.value.get("entities").is_some());
3923        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3924        assert!(!result.is_oauth);
3925    }
3926
3927    #[test]
3928    fn parse_claude_output_detects_oauth() {
3929        let output = r#"[
3930            {"type":"system","subtype":"init","apiKeySource":"none"},
3931            {"type":"result","is_error":false,"total_cost_usd":0.0,
3932             "structured_output":{"entities":[],"relationships":[]}}
3933        ]"#;
3934        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3935        assert!(result.is_oauth);
3936    }
3937
3938    #[test]
3939    fn parse_claude_output_rate_limit_returns_error() {
3940        let output = r#"[
3941            {"type":"system","subtype":"init"},
3942            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3943        ]"#;
3944        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3945        assert!(matches!(err, AppError::RateLimited { .. }));
3946    }
3947
3948    #[test]
3949    fn parse_claude_output_auth_error() {
3950        let output = r#"[
3951            {"type":"system","subtype":"init"},
3952            {"type":"result","is_error":true,"error":"authentication failed"}
3953        ]"#;
3954        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3955        assert!(format!("{err}").contains("authentication failed"));
3956    }
3957
3958    #[cfg(unix)]
3959    #[test]
3960    fn call_codex_returns_raw_json_for_body_enrich_schema() {
3961        let tmp = tempfile::tempdir().expect("tempdir");
3962        let binary = tmp.path().join("codex-mock");
3963        std::fs::write(
3964            &binary,
3965            r#"#!/usr/bin/env bash
3966set -euo pipefail
3967cat <<'JSONL'
3968{"type":"thread.started","thread_id":"mock-thread-0"}
3969{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
3970{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
3971JSONL
3972"#,
3973        )
3974        .expect("mock codex write");
3975        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
3976        perms.set_mode(0o755);
3977        std::fs::set_permissions(&binary, perms).expect("chmod");
3978
3979        let (value, cost, is_oauth) =
3980            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
3981                .expect("call_codex must accept body-enrich payload");
3982
3983        assert_eq!(value["enriched_body"], "expanded body");
3984        assert_eq!(cost, 0.0);
3985        assert!(!is_oauth);
3986    }
3987
3988    #[test]
3989    fn dry_run_emits_preview_without_calling_llm() {
3990        // This test validates the dry-run NDJSON contract without spawning any process.
3991        // The scan_operation function requires a DB; we build one in-memory but cannot
3992        // call run() directly because it needs AppPaths (disk). Instead we test the
3993        // lower-level helpers that the dry-run path relies on.
3994        let conn = open_test_db();
3995        conn.execute(
3996            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3997            [],
3998        )
3999        .unwrap();
4000
4001        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4002        assert_eq!(results.len(), 1);
4003        assert_eq!(results[0].1, "dry-mem");
4004        // If scan finds the item and dry_run is set, no LLM would be called.
4005        // The NDJSON emission is tested via integration tests with a fake binary.
4006    }
4007
4008    #[test]
4009    fn persist_entity_description_updates_db() {
4010        let conn = open_test_db();
4011        conn.execute(
4012            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4013            [],
4014        )
4015        .unwrap();
4016        let eid: i64 = conn
4017            .query_row(
4018                "SELECT id FROM entities WHERE name='tokio-runtime'",
4019                [],
4020                |r| r.get(0),
4021            )
4022            .unwrap();
4023
4024        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4025
4026        let desc: String = conn
4027            .query_row(
4028                "SELECT description FROM entities WHERE id=?1",
4029                rusqlite::params![eid],
4030                |r| r.get(0),
4031            )
4032            .unwrap();
4033        assert_eq!(desc, "Async runtime for Rust applications");
4034    }
4035
4036    #[test]
4037    fn bindings_schema_is_valid_json() {
4038        let _: serde_json::Value =
4039            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4040    }
4041
4042    #[test]
4043    fn entity_description_schema_is_valid_json() {
4044        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4045            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4046    }
4047
4048    #[test]
4049    fn body_enrich_schema_is_valid_json() {
4050        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4051            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4052    }
4053}