Skip to main content

sqlite_graphrag/commands/
enrich.rs

1//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
2//!
3//! Enriches the knowledge graph by running LLM-powered analysis over memories
4//! and entities that are missing key structural data. Operations are:
5//!
6//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
7//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
8//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
9//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
10//!
11//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
12//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
13// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
14//!
15//! # DRY opportunity
16//!
17//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
18//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
19//! here verbatim. A future refactoring could extract them into a shared
20//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
21//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
22//! this stream's boundary — flagged here for the Integration stream to evaluate.
23
24use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39// ---------------------------------------------------------------------------
40// Constants
41// ---------------------------------------------------------------------------
42
43const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48// ---------------------------------------------------------------------------
49// JSON schema used for memory-bindings and body-enrich extraction
50// ---------------------------------------------------------------------------
51
52const BINDINGS_SCHEMA: &str = r#"{
53  "type": "object",
54  "properties": {
55    "entities": {
56      "type": "array",
57      "items": {
58        "type": "object",
59        "properties": {
60          "name": { "type": "string" },
61          "entity_type": {
62            "type": "string",
63            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64          }
65        },
66        "required": ["name", "entity_type"],
67        "additionalProperties": false
68      }
69    },
70    "relationships": {
71      "type": "array",
72      "items": {
73        "type": "object",
74        "properties": {
75          "source": { "type": "string" },
76          "target": { "type": "string" },
77          "relation": {
78            "type": "string",
79            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80          },
81          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82        },
83        "required": ["source","target","relation","strength"],
84        "additionalProperties": false
85      }
86    }
87  },
88  "required": ["entities","relationships"],
89  "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93  "type": "object",
94  "properties": {
95    "description": { "type": "string" }
96  },
97  "required": ["description"],
98  "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102  "type": "object",
103  "properties": {
104    "enriched_body": { "type": "string" }
105  },
106  "required": ["enriched_body"],
107  "additionalProperties": false
108}"#;
109
110// G27 P1: weight-calibrate
111const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120  "type": "object",
121  "properties": {
122    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123    "reasoning": { "type": "string" }
124  },
125  "required": ["calibrated_weight", "reasoning"],
126  "additionalProperties": false
127}"#;
128
129// G27 P1: relation-reclassify
130const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146  "type": "object",
147  "properties": {
148    "relation": { "type": "string" },
149    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150    "reasoning": { "type": "string" }
151  },
152  "required": ["relation", "strength", "reasoning"],
153  "additionalProperties": false
154}"#;
155
156// G27 P2: entity-connect — suggest relationships between isolated entities
157const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163  "type": "object",
164  "properties": {
165    "relation": { "type": "string" },
166    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167    "reasoning": { "type": "string" }
168  },
169  "required": ["relation", "strength", "reasoning"],
170  "additionalProperties": false
171}"#;
172
173// G27 P2: entity-type-validate — verify entity type assignments
174const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180  "type": "object",
181  "properties": {
182    "validated_type": { "type": "string" },
183    "was_correct": { "type": "boolean" },
184    "reasoning": { "type": "string" }
185  },
186  "required": ["validated_type", "was_correct", "reasoning"],
187  "additionalProperties": false
188}"#;
189
190// G27 P2: description-enrich — improve generic memory descriptions
191const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197  "type": "object",
198  "properties": {
199    "description": { "type": "string" },
200    "reasoning": { "type": "string" }
201  },
202  "required": ["description", "reasoning"],
203  "additionalProperties": false
204}"#;
205
206// G27 P2: domain-classify — classify memory into domain category
207const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211  "type": "object",
212  "properties": {
213    "domain": { "type": "string" },
214    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215    "reasoning": { "type": "string" }
216  },
217  "required": ["domain", "confidence", "reasoning"],
218  "additionalProperties": false
219}"#;
220
221// G27 P2: graph-audit — audit graph for quality issues
222const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227  "type": "object",
228  "properties": {
229    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231    "reasoning": { "type": "string" }
232  },
233  "required": ["quality_score", "issues", "reasoning"],
234  "additionalProperties": false
235}"#;
236
237// G27 P2: deep-research-synth — synthesize research findings into graph
238const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244  "type": "object",
245  "properties": {
246    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248    "summary": { "type": "string" }
249  },
250  "required": ["entities", "relationships", "summary"],
251  "additionalProperties": false
252}"#;
253
254// G27 P2: body-extract — extract structured content from unstructured text
255const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260  "type": "object",
261  "properties": {
262    "restructured_body": { "type": "string" },
263    "changes_summary": { "type": "string" }
264  },
265  "required": ["restructured_body", "changes_summary"],
266  "additionalProperties": false
267}"#;
268
269// ---------------------------------------------------------------------------
270// Prompts
271// ---------------------------------------------------------------------------
272
273const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288// ---------------------------------------------------------------------------
289// CLI args
290// ---------------------------------------------------------------------------
291
292/// Operation to perform in the `enrich` command.
293#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296    /// Add missing entity/relationship bindings to memories (fully implemented).
297    MemoryBindings,
298    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
299    EntityDescriptions,
300    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
301    BodyEnrich,
302    /// Rebuild missing memory embeddings without rewriting the memory body.
303    ReEmbed,
304    /// Calibrate relationship weights using LLM analysis (scan only).
305    WeightCalibrate,
306    /// Reclassify relationship types using LLM judgment (scan only).
307    RelationReclassify,
308    /// Connect isolated entities by suggesting new relationships (scan only).
309    EntityConnect,
310    /// Validate entity type assignments using LLM judgment (scan only).
311    EntityTypeValidate,
312    /// Enrich memory descriptions that are generic/auto-generated (scan only).
313    DescriptionEnrich,
314    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
315    CrossDomainBridges,
316    /// Classify memories into domain categories (scan only).
317    DomainClassify,
318    /// Audit the graph for quality issues (scan only).
319    GraphAudit,
320    /// Synthesize deep-research findings into graph memories (scan only).
321    DeepResearchSynth,
322    /// Extract structured body from unstructured text (scan only).
323    BodyExtract,
324}
325
326/// LLM provider for enrichment.
327#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329    /// Use locally installed Claude Code CLI (OAuth-first).
330    ClaudeCode,
331    /// Use locally installed OpenAI Codex CLI.
332    Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        match self {
338            EnrichMode::ClaudeCode => write!(f, "claude-code"),
339            EnrichMode::Codex => write!(f, "codex"),
340        }
341    }
342}
343
344/// Arguments for the `enrich` subcommand.
345#[derive(clap::Args)]
346#[command(
347    about = "Enrich graph memories and entities using an LLM provider",
348    after_long_help = "EXAMPLES:\n  \
349    # Add missing entity bindings to all unbound memories\n  \
350    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
351    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
352    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
353    # Expand short memory bodies (GAP-18)\n  \
354    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
355    # Rebuild only missing memory embeddings without rewriting bodies\n  \
356    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
357    # Resume an interrupted body-enrich run\n  \
358    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
359    # Retry only failed items from a previous run\n  \
360    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361    EXIT CODES:\n  \
362    0  success\n  \
363    1  validation error (bad args, binary not found)\n  \
364    14 I/O error"
365)]
366pub struct EnrichArgs {
367    /// Enrichment operation to run.
368    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369    pub operation: EnrichOperation,
370
371    /// LLM provider to use. Default: claude-code (OAuth-first).
372    #[arg(long, value_enum, default_value = "claude-code")]
373    pub mode: EnrichMode,
374
375    /// Maximum number of items to process in this run. Omit for all.
376    #[arg(long, value_name = "N")]
377    pub limit: Option<usize>,
378
379    /// Preview items without calling the LLM (zero tokens consumed).
380    #[arg(long)]
381    pub dry_run: bool,
382
383    /// Namespace to operate on. Default: global.
384    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385    pub namespace: Option<String>,
386
387    // -- Provider flags (Claude) --
388    /// Path to the Claude Code binary. Default: auto-detect from PATH.
389    #[arg(long, value_name = "PATH")]
390    pub claude_binary: Option<PathBuf>,
391
392    /// Claude model to use (e.g. claude-sonnet-4-6).
393    #[arg(long, value_name = "MODEL")]
394    pub claude_model: Option<String>,
395
396    /// Timeout per item in seconds when using Claude Code. Default: 300.
397    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398    pub claude_timeout: u64,
399
400    // -- Provider flags (Codex) --
401    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
402    #[arg(long, value_name = "PATH")]
403    pub codex_binary: Option<PathBuf>,
404
405    /// Codex model to use (e.g. o4-mini).
406    #[arg(long, value_name = "MODEL")]
407    pub codex_model: Option<String>,
408
409    /// Timeout per item in seconds when using Codex. Default: 300.
410    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411    pub codex_timeout: u64,
412
413    // -- Cost controls --
414    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
415    #[arg(long, value_name = "USD")]
416    pub max_cost_usd: Option<f64>,
417
418    // -- Queue controls --
419    /// Resume a previously interrupted run (skip already-done items).
420    #[arg(long)]
421    pub resume: bool,
422
423    /// Retry only items that failed in a previous run.
424    #[arg(long)]
425    pub retry_failed: bool,
426
427    // -- body-enrich specific flags (GAP-18) --
428    /// Minimum output character count for body-enrich. Default: 500.
429    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430    pub min_output_chars: usize,
431
432    /// Maximum output character count for body-enrich. Default: 2000.
433    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434    pub max_output_chars: usize,
435
436    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
437    #[arg(long, default_value_t = true)]
438    pub preserve_check: bool,
439
440    /// Path to a custom prompt template file for body-enrich.
441    #[arg(long, value_name = "PATH")]
442    pub prompt_template: Option<PathBuf>,
443
444    /// Number of parallel LLM workers (default 1 = serial).
445    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
446    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
447    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448    pub llm_parallelism: u32,
449
450    // -- Output / infra --
451    /// Emit NDJSON output. Always true; flag accepted for compatibility.
452    #[arg(long)]
453    pub json: bool,
454
455    /// Database path override.
456    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457    pub db: Option<String>,
458
459    /// G30: poll for the job singleton every second for up to N seconds
460    /// when another invocation holds the lock. Default: 0 (fail fast).
461    #[arg(long, value_name = "SECONDS")]
462    pub wait_job_singleton: Option<u64>,
463
464    /// G30: force acquisition of the singleton lock by removing a stale
465    /// lock file from a previously crashed invocation. Use only when you
466    /// are certain no other `enrich`/`ingest` is running.
467    #[arg(long, default_value_t = false)]
468    pub force_job_singleton: bool,
469
470    /// G37: select a specific subset of memory names to enrich instead of
471    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
472    /// Empty when omitted (processes all candidates).
473    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474    pub names: Vec<String>,
475
476    /// G37: read the subset of memory names from a file (one per line).
477    /// Lines starting with `#` and empty lines are ignored. Combined with
478    /// `--names` (union) when both are set.
479    #[arg(long, value_name = "PATH")]
480    pub names_file: Option<PathBuf>,
481
482    /// G35: probe the LLM provider with a 1-turn ping before processing
483    /// the batch. Aborts with a clear error if the rate-limit window is
484    /// closed (avoids burning N turns only to fail on item 1).
485    #[arg(long, default_value_t = false)]
486    pub preflight_check: bool,
487
488    /// G35: if a preflight probe or in-flight call hits the Claude rate
489    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
490    /// of failing the batch. Ignored when `--mode` is already `codex`.
491    #[arg(long, value_enum)]
492    pub fallback_mode: Option<EnrichMode>,
493
494    /// G35: number of seconds before the OAuth rate-limit reset at which
495    /// the preflight probe should refuse to start. Default 300 (5 min).
496    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497    pub rate_limit_buffer: u64,
498
499    /// G28-D: refuse to start when the 1-minute load average exceeds
500    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
501    /// Set to false to skip the check on contended CI runners.
502    #[arg(long, default_value_t = true)]
503    pub max_load_check: bool,
504
505    /// G28-D: when the system is saturated, abort the job after this
506    /// many consecutive HardFailure outcomes. Default 5.
507    #[arg(long, value_name = "N", default_value_t = 5)]
508    pub circuit_breaker_threshold: u32,
509
510    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
511    /// original body and the LLM-rewritten body for the rewrite to be
512    /// accepted. Scores below the threshold are rejected and emitted as
513    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
514    /// gap specification). Ignored when `--operation` is not
515    /// `body-enrich`.
516    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517    pub preserve_threshold: f64,
518
519    /// G33 Passo 3: when set, validate `--codex-model` against the
520    /// ChatGPT Pro OAuth accepted-model list and abort with a
521    /// suggestion when the value is unknown. Default true (fail fast
522    /// to avoid burning OAuth turns). Set to false to opt out.
523    #[arg(long, default_value_t = true)]
524    pub codex_model_validate: bool,
525
526    /// G33 Passo 3: when set together with an invalid `--codex-model`,
527    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
528    /// instead of aborting. The substitution is recorded in the NDJSON
529    /// stream as `provider_substituted: true` for traceability.
530    #[arg(long, value_name = "MODEL")]
531    pub codex_model_fallback: Option<String>,
532}
533
534// ---------------------------------------------------------------------------
535// Internal types — raw LLM output structs
536// ---------------------------------------------------------------------------
537
538// ---------------------------------------------------------------------------
539// NDJSON event types emitted to stdout
540// ---------------------------------------------------------------------------
541
542#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544    phase: &'a str,
545    #[serde(skip_serializing_if = "Option::is_none")]
546    binary_path: Option<&'a str>,
547    #[serde(skip_serializing_if = "Option::is_none")]
548    version: Option<&'a str>,
549    #[serde(skip_serializing_if = "Option::is_none")]
550    items_total: Option<usize>,
551    #[serde(skip_serializing_if = "Option::is_none")]
552    items_pending: Option<usize>,
553    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
554    #[serde(skip_serializing_if = "Option::is_none")]
555    llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560    /// Item identifier (memory name or entity name).
561    item: &'a str,
562    status: &'a str,
563    #[serde(skip_serializing_if = "Option::is_none")]
564    memory_id: Option<i64>,
565    #[serde(skip_serializing_if = "Option::is_none")]
566    entity_id: Option<i64>,
567    #[serde(skip_serializing_if = "Option::is_none")]
568    entities: Option<usize>,
569    #[serde(skip_serializing_if = "Option::is_none")]
570    rels: Option<usize>,
571    #[serde(skip_serializing_if = "Option::is_none")]
572    chars_before: Option<usize>,
573    #[serde(skip_serializing_if = "Option::is_none")]
574    chars_after: Option<usize>,
575    #[serde(skip_serializing_if = "Option::is_none")]
576    cost_usd: Option<f64>,
577    #[serde(skip_serializing_if = "Option::is_none")]
578    elapsed_ms: Option<u64>,
579    #[serde(skip_serializing_if = "Option::is_none")]
580    error: Option<String>,
581    index: usize,
582    total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587    summary: bool,
588    operation: String,
589    items_total: usize,
590    completed: usize,
591    failed: usize,
592    skipped: usize,
593    cost_usd: f64,
594    elapsed_ms: u64,
595}
596
597use crate::output::emit_json_line as emit_json;
598
599// ---------------------------------------------------------------------------
600// Queue DB
601// ---------------------------------------------------------------------------
602
603/// Opens or creates the enrichment queue database.
604///
605/// The queue schema mirrors `ingest_claude` for resume/retry parity.
606/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
607///
608/// # DRY note
609///
610/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
611/// Both should be unified in a shared `llm_runner.rs` module by the
612/// Integration stream.
613fn open_queue_db(path: &str) -> Result<Connection, AppError> {
614    let conn = Connection::open(path)?;
615    conn.pragma_update(None, "journal_mode", "wal")?;
616    conn.execute_batch(
617        "CREATE TABLE IF NOT EXISTS queue (
618            id          INTEGER PRIMARY KEY AUTOINCREMENT,
619            item_key    TEXT NOT NULL UNIQUE,
620            item_type   TEXT NOT NULL DEFAULT 'memory',
621            status      TEXT NOT NULL DEFAULT 'pending',
622            memory_id   INTEGER,
623            entity_id   INTEGER,
624            entities    INTEGER DEFAULT 0,
625            rels        INTEGER DEFAULT 0,
626            error       TEXT,
627            cost_usd    REAL DEFAULT 0.0,
628            attempt     INTEGER DEFAULT 0,
629            elapsed_ms  INTEGER,
630            created_at  TEXT DEFAULT (datetime('now')),
631            done_at     TEXT
632        );
633        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
634    )?;
635    Ok(conn)
636}
637
638// ---------------------------------------------------------------------------
639// LLM invocation — Claude Code
640// ---------------------------------------------------------------------------
641
642/// Calls `claude -p` via the shared `claude_runner` module (G02).
643///
644/// Returns `(output_value, cost_usd, is_oauth)`.
645fn call_claude(
646    binary: &Path,
647    prompt: &str,
648    json_schema: &str,
649    input_text: &str,
650    model: Option<&str>,
651    timeout_secs: u64,
652) -> Result<(serde_json::Value, f64, bool), AppError> {
653    let result = crate::commands::claude_runner::run_claude(
654        binary,
655        prompt,
656        json_schema,
657        input_text,
658        model,
659        timeout_secs,
660        7,
661    )?;
662    Ok((result.value, result.cost_usd, result.is_oauth))
663}
664
665// ---------------------------------------------------------------------------
666// Preflight probe (G35) — single-turn ping to verify the LLM provider
667// ---------------------------------------------------------------------------
668
669/// Result of a single preflight ping (G35).
670enum PreflightOutcome {
671    /// The provider accepted the ping without rate-limit or other errors.
672    Healthy,
673    /// The provider rejected the ping due to OAuth rate limit. The
674    /// `suggestion` field is a human hint that callers can embed in the
675    /// user-facing error.
676    RateLimited {
677        reason: String,
678        suggestion: &'static str,
679    },
680    /// Any other provider error (binary missing, auth failure, etc.).
681    Error(AppError),
682}
683
684/// Probes the configured LLM provider with a 1-turn ping.
685///
686/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
687/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
688///
689/// The probe intentionally avoids spawning any MCP server children (G28-A)
690/// to keep its own process footprint at the minimum.
691fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
692    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
693
694    match args.mode {
695        EnrichMode::ClaudeCode => {
696            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
697                Ok(b) => b,
698                Err(e) => return PreflightOutcome::Error(e),
699            };
700            let mut cmd = std::process::Command::new(&bin);
701            cmd.env_clear();
702            for var in &["PATH", "HOME", "USER"] {
703                if let Ok(val) = std::env::var(var) {
704                    cmd.env(var, val);
705                }
706            }
707            cmd.arg("-p")
708                .arg("ping")
709                .arg("--max-turns")
710                .arg("1")
711                .arg("--strict-mcp-config")
712                .arg("--mcp-config")
713                .arg("{}")
714                .arg("--dangerously-skip-permissions")
715                .arg("--settings")
716                .arg("{\"hooks\":{}}")
717                .arg("--output-format")
718                .arg("json")
719                .stdin(std::process::Stdio::null())
720                .stdout(std::process::Stdio::piped())
721                .stderr(std::process::Stdio::piped());
722
723            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
724                Ok(c) => c,
725                Err(e) => {
726                    return PreflightOutcome::Error(AppError::Io(e));
727                }
728            };
729            let output = match wait_with_timeout(child, timeout) {
730                Ok(out) => out,
731                Err(e) => return PreflightOutcome::Error(e),
732            };
733            if !output.status.success() {
734                let stderr = String::from_utf8_lossy(&output.stderr);
735                if stderr.contains("hit your session limit")
736                    || stderr.contains("rate_limit")
737                    || stderr.contains("429")
738                {
739                    return PreflightOutcome::RateLimited {
740                        reason: stderr.trim().to_string(),
741                        suggestion:
742                            "wait for the OAuth window to reset or use --fallback-mode codex",
743                    };
744                }
745                return PreflightOutcome::Error(AppError::Validation(format!(
746                    "preflight probe failed: {stderr}",
747                    stderr = stderr.trim()
748                )));
749            }
750            PreflightOutcome::Healthy
751        }
752        EnrichMode::Codex => {
753            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
754                Ok(b) => b,
755                Err(e) => return PreflightOutcome::Error(e),
756            };
757            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
758                .map_err(PreflightOutcome::Error)
759                .ok();
760            let schema = "{}";
761            let schema_path = match super::codex_spawn::trusted_schema_path() {
762                Ok(p) => p,
763                Err(e) => return PreflightOutcome::Error(e),
764            };
765            let spawn_args = super::codex_spawn::CodexSpawnArgs {
766                binary: &bin,
767                prompt: "ping",
768                json_schema: schema,
769                input_text: "",
770                model: args.codex_model.as_deref(),
771                timeout_secs: args.rate_limit_buffer.max(60),
772                schema_path: schema_path.clone(),
773            };
774            let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
775            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
776                Ok(c) => c,
777                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
778            };
779            let output = match wait_with_timeout(child, timeout) {
780                Ok(out) => out,
781                Err(e) => return PreflightOutcome::Error(e),
782            };
783            let _ = std::fs::remove_file(&schema_path);
784            if !output.status.success() {
785                let stderr = String::from_utf8_lossy(&output.stderr);
786                if stderr.contains("rate_limit")
787                    || stderr.contains("429")
788                    || stderr.contains("Too Many Requests")
789                {
790                    return PreflightOutcome::RateLimited {
791                        reason: stderr.trim().to_string(),
792                        suggestion: "wait for the rate-limit window to reset",
793                    };
794                }
795                return PreflightOutcome::Error(AppError::Validation(format!(
796                    "preflight probe failed: {stderr}",
797                    stderr = stderr.trim()
798                )));
799            }
800            PreflightOutcome::Healthy
801        }
802    }
803}
804
805/// Cross-platform wait with timeout (no extra crate dependency).
806fn wait_with_timeout(
807    mut child: std::process::Child,
808    timeout: std::time::Duration,
809) -> Result<std::process::Output, AppError> {
810    use wait_timeout::ChildExt;
811    let start = std::time::Instant::now();
812    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
813    if status.is_none() {
814        let _ = child.kill();
815        let _ = child.wait();
816        return Err(AppError::Validation(format!(
817            "preflight probe timed out after {}s",
818            start.elapsed().as_secs()
819        )));
820    }
821    let mut stdout = Vec::new();
822    if let Some(mut out) = child.stdout.take() {
823        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
824    }
825    let mut stderr = Vec::new();
826    if let Some(mut err) = child.stderr.take() {
827        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
828    }
829    let exit = status.unwrap();
830    Ok(std::process::Output {
831        status: exit,
832        stdout,
833        stderr,
834    })
835}
836
837// ---------------------------------------------------------------------------
838// SCAN helpers — SQL queries that find items needing enrichment
839// ---------------------------------------------------------------------------
840
841/// Returns memories without any `memory_entities` binding.
842///
843/// These are the targets for `memory-bindings` enrichment. When `name_filter`
844/// is non-empty, restricts the scan to the given names (G37); unknown names
845/// are silently skipped (the caller can detect them by comparing
846/// requested vs. returned).
847fn scan_unbound_memories(
848    conn: &Connection,
849    namespace: &str,
850    limit: Option<usize>,
851    name_filter: &[String],
852) -> Result<Vec<(i64, String, String)>, AppError> {
853    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
854
855    if name_filter.is_empty() {
856        let sql = format!(
857            "SELECT m.id, m.name, m.body
858             FROM memories m
859             WHERE m.namespace = ?1
860               AND m.deleted_at IS NULL
861               AND NOT EXISTS (
862                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
863               )
864             ORDER BY m.id
865             {limit_clause}"
866        );
867        let mut stmt = conn.prepare(&sql)?;
868        let rows = stmt
869            .query_map(rusqlite::params![namespace], |r| {
870                Ok((
871                    r.get::<_, i64>(0)?,
872                    r.get::<_, String>(1)?,
873                    r.get::<_, String>(2)?,
874                ))
875            })?
876            .collect::<Result<Vec<_>, _>>()?;
877        Ok(rows)
878    } else {
879        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
880        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
881            .map(|i| format!("?{i}"))
882            .collect();
883        let in_clause = placeholders.join(", ");
884        let sql = format!(
885            "SELECT m.id, m.name, m.body
886             FROM memories m
887             WHERE m.namespace = ?1
888               AND m.deleted_at IS NULL
889               AND m.name IN ({in_clause})
890               AND NOT EXISTS (
891                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
892               )
893             ORDER BY m.id
894             {limit_clause}"
895        );
896        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
897        params_vec.push(&namespace);
898        for n in name_filter {
899            params_vec.push(n);
900        }
901        let mut stmt = conn.prepare(&sql)?;
902        let rows = stmt
903            .query_map(
904                rusqlite::params_from_iter(params_vec.iter().copied()),
905                |r| {
906                    Ok((
907                        r.get::<_, i64>(0)?,
908                        r.get::<_, String>(1)?,
909                        r.get::<_, String>(2)?,
910                    ))
911                },
912            )?
913            .collect::<Result<Vec<_>, _>>()?;
914        Ok(rows)
915    }
916}
917
918/// Reads a list of memory names from a UTF-8 text file (G37).
919///
920/// Empty lines and lines beginning with `#` are skipped. Returns a
921/// de-duplicated, order-preserving list of trimmed names.
922fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
923    let content = std::fs::read_to_string(path).map_err(|e| {
924        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
925    })?;
926    let mut seen = std::collections::HashSet::new();
927    let mut out = Vec::new();
928    for line in content.lines() {
929        let trimmed = line.trim();
930        if trimmed.is_empty() || trimmed.starts_with('#') {
931            continue;
932        }
933        if seen.insert(trimmed.to_string()) {
934            out.push(trimmed.to_string());
935        }
936    }
937    Ok(out)
938}
939
940/// Resolves the union of `--names` and `--names-file` (G37).
941fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
942    let mut combined: Vec<String> = args.names.clone();
943    if let Some(p) = &args.names_file {
944        let from_file = read_names_file(p)?;
945        for n in from_file {
946            if !combined.contains(&n) {
947                combined.push(n);
948            }
949        }
950    }
951    Ok(combined)
952}
953
954/// Returns entities with NULL or empty description.
955///
956/// These are the targets for `entity-descriptions` enrichment.
957fn scan_entities_without_description(
958    conn: &Connection,
959    namespace: &str,
960    limit: Option<usize>,
961) -> Result<Vec<(i64, String, String)>, AppError> {
962    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
963    let sql = format!(
964        "SELECT id, name, type
965         FROM entities
966         WHERE namespace = ?1
967           AND (description IS NULL OR description = '')
968         ORDER BY id
969         {limit_clause}"
970    );
971    let mut stmt = conn.prepare(&sql)?;
972    let rows = stmt
973        .query_map(rusqlite::params![namespace], |r| {
974            Ok((
975                r.get::<_, i64>(0)?,
976                r.get::<_, String>(1)?,
977                r.get::<_, String>(2)?,
978            ))
979        })?
980        .collect::<Result<Vec<_>, _>>()?;
981    Ok(rows)
982}
983
984/// Returns memories whose body length is below the configured minimum.
985///
986/// These are the targets for `body-enrich` (GAP-18).
987fn scan_short_body_memories(
988    conn: &Connection,
989    namespace: &str,
990    min_chars: usize,
991    limit: Option<usize>,
992) -> Result<Vec<(i64, String, String)>, AppError> {
993    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
994    let sql = format!(
995        "SELECT m.id, m.name, m.body
996         FROM memories m
997         WHERE m.namespace = ?1
998           AND m.deleted_at IS NULL
999           AND LENGTH(COALESCE(m.body,'')) < ?2
1000         ORDER BY m.id
1001         {limit_clause}"
1002    );
1003    let mut stmt = conn.prepare(&sql)?;
1004    let rows = stmt
1005        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1006            Ok((
1007                r.get::<_, i64>(0)?,
1008                r.get::<_, String>(1)?,
1009                r.get::<_, String>(2)?,
1010            ))
1011        })?
1012        .collect::<Result<Vec<_>, _>>()?;
1013    Ok(rows)
1014}
1015
1016/// Returns live memories that still have no row in `memory_embeddings`.
1017///
1018/// These are the targets for `re-embed`.
1019fn scan_memories_without_embeddings(
1020    conn: &Connection,
1021    namespace: &str,
1022    limit: Option<usize>,
1023    name_filter: &[String],
1024) -> Result<Vec<(i64, String, String)>, AppError> {
1025    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1026
1027    if name_filter.is_empty() {
1028        let sql = format!(
1029            "SELECT m.id, m.name, COALESCE(m.body,'')
1030             FROM memories m
1031             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1032             WHERE m.namespace = ?1
1033               AND m.deleted_at IS NULL
1034               AND me.memory_id IS NULL
1035             ORDER BY m.id
1036             {limit_clause}"
1037        );
1038        let mut stmt = conn.prepare(&sql)?;
1039        let rows = stmt
1040            .query_map(rusqlite::params![namespace], |r| {
1041                Ok((
1042                    r.get::<_, i64>(0)?,
1043                    r.get::<_, String>(1)?,
1044                    r.get::<_, String>(2)?,
1045                ))
1046            })?
1047            .collect::<Result<Vec<_>, _>>()?;
1048        Ok(rows)
1049    } else {
1050        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1051            .map(|i| format!("?{i}"))
1052            .collect();
1053        let in_clause = placeholders.join(", ");
1054        let sql = format!(
1055            "SELECT m.id, m.name, COALESCE(m.body,'')
1056             FROM memories m
1057             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1058             WHERE m.namespace = ?1
1059               AND m.deleted_at IS NULL
1060               AND m.name IN ({in_clause})
1061               AND me.memory_id IS NULL
1062             ORDER BY m.id
1063             {limit_clause}"
1064        );
1065        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1066        params_vec.push(&namespace);
1067        for n in name_filter {
1068            params_vec.push(n);
1069        }
1070        let mut stmt = conn.prepare(&sql)?;
1071        let rows = stmt
1072            .query_map(
1073                rusqlite::params_from_iter(params_vec.iter().copied()),
1074                |r| {
1075                    Ok((
1076                        r.get::<_, i64>(0)?,
1077                        r.get::<_, String>(1)?,
1078                        r.get::<_, String>(2)?,
1079                    ))
1080                },
1081            )?
1082            .collect::<Result<Vec<_>, _>>()?;
1083        Ok(rows)
1084    }
1085}
1086
1087/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1088#[allow(clippy::type_complexity)]
1089fn scan_weight_candidates(
1090    conn: &Connection,
1091    namespace: &str,
1092    limit: Option<usize>,
1093) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1094    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1095    let sql = format!(
1096        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1097         FROM relationships r \
1098         JOIN entities e1 ON e1.id = r.source_id \
1099         JOIN entities e2 ON e2.id = r.target_id \
1100         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1101         ORDER BY r.weight DESC {limit_clause}"
1102    );
1103    let mut stmt = conn.prepare(&sql)?;
1104    let rows = stmt
1105        .query_map(rusqlite::params![namespace], |r| {
1106            Ok((
1107                r.get::<_, i64>(0)?,
1108                r.get::<_, String>(1)?,
1109                r.get::<_, String>(2)?,
1110                r.get::<_, String>(3)?,
1111                r.get::<_, f64>(4)?,
1112            ))
1113        })?
1114        .collect::<Result<Vec<_>, _>>()?;
1115    Ok(rows)
1116}
1117
1118/// G27: Returns relationships with generic relation types (applies_to).
1119fn scan_generic_relations(
1120    conn: &Connection,
1121    namespace: &str,
1122    limit: Option<usize>,
1123) -> Result<Vec<(i64, String, String, String)>, AppError> {
1124    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1125    let sql = format!(
1126        "SELECT r.id, e1.name, e2.name, r.relation \
1127         FROM relationships r \
1128         JOIN entities e1 ON e1.id = r.source_id \
1129         JOIN entities e2 ON e2.id = r.target_id \
1130         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1131         ORDER BY r.id {limit_clause}"
1132    );
1133    let mut stmt = conn.prepare(&sql)?;
1134    let rows = stmt
1135        .query_map(rusqlite::params![namespace], |r| {
1136            Ok((
1137                r.get::<_, i64>(0)?,
1138                r.get::<_, String>(1)?,
1139                r.get::<_, String>(2)?,
1140                r.get::<_, String>(3)?,
1141            ))
1142        })?
1143        .collect::<Result<Vec<_>, _>>()?;
1144    Ok(rows)
1145}
1146
1147// ---------------------------------------------------------------------------
1148// PERSIST helpers for fully-implemented operations
1149// ---------------------------------------------------------------------------
1150
1151/// Persists entity bindings extracted by the LLM for a memory.
1152///
1153/// Creates entities via `upsert_entity`, links them to the memory via
1154/// `link_memory_entity`, and upserts relationships found between entities.
1155fn persist_memory_bindings(
1156    conn: &Connection,
1157    namespace: &str,
1158    memory_id: i64,
1159    entities_json: &serde_json::Value,
1160    rels_json: &serde_json::Value,
1161) -> Result<(usize, usize), AppError> {
1162    #[derive(Deserialize)]
1163    struct EntityItem {
1164        name: String,
1165        entity_type: String,
1166    }
1167    #[derive(Deserialize)]
1168    struct RelItem {
1169        source: String,
1170        target: String,
1171        relation: String,
1172        strength: f64,
1173    }
1174
1175    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1176        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1177
1178    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1179        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1180
1181    let mut ent_count = 0usize;
1182    let mut rel_count = 0usize;
1183
1184    for item in &extracted_entities {
1185        let entity_type = match item.entity_type.parse::<EntityType>() {
1186            Ok(et) => et,
1187            Err(_) => {
1188                tracing::warn!(
1189                    target: "enrich",
1190                    entity = %item.name,
1191                    entity_type = %item.entity_type,
1192                    "entity type not recognized, skipping"
1193                );
1194                continue;
1195            }
1196        };
1197        match entities::upsert_entity(
1198            conn,
1199            namespace,
1200            &NewEntity {
1201                name: item.name.clone(),
1202                entity_type,
1203                description: None,
1204            },
1205        ) {
1206            Ok(eid) => {
1207                let _ = entities::link_memory_entity(conn, memory_id, eid);
1208                ent_count += 1;
1209            }
1210            Err(e) => {
1211                tracing::warn!(
1212                    target: "enrich",
1213                    entity = %item.name,
1214                    error = %e,
1215                    "entity upsert skipped"
1216                );
1217            }
1218        }
1219    }
1220
1221    for rel in &extracted_rels {
1222        let normalized = crate::parsers::normalize_relation(&rel.relation);
1223        crate::parsers::warn_if_non_canonical(&normalized);
1224
1225        // Normalize entity names before lookup: upsert_entity normalizes on write,
1226        // so the lookup must use the same normalized form to find the row.
1227        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1228        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1229        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1230        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1231        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1232            let new_rel = NewRelationship {
1233                source: rel.source.clone(),
1234                target: rel.target.clone(),
1235                relation: normalized,
1236                strength: rel.strength,
1237                description: None,
1238            };
1239            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1240                rel_count += 1;
1241            }
1242        }
1243    }
1244
1245    Ok((ent_count, rel_count))
1246}
1247
1248/// Updates an entity's description directly in the `entities` table.
1249fn persist_entity_description(
1250    conn: &Connection,
1251    entity_id: i64,
1252    description: &str,
1253) -> Result<(), AppError> {
1254    conn.execute(
1255        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1256        rusqlite::params![description, entity_id],
1257    )?;
1258    Ok(())
1259}
1260
1261#[allow(clippy::too_many_arguments)]
1262fn reembed_memory_vector(
1263    conn: &Connection,
1264    namespace: &str,
1265    memory_id: i64,
1266    memory_name: &str,
1267    memory_type: &str,
1268    body: &str,
1269    paths: &crate::paths::AppPaths,
1270    llm_backend: crate::cli::LlmBackendChoice,
1271) -> Result<(), AppError> {
1272    let snippet: String = body.chars().take(200).collect();
1273    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback
1274    let embedding =
1275        crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1276    memories::upsert_vec(
1277        conn,
1278        memory_id,
1279        namespace,
1280        memory_type,
1281        &embedding,
1282        memory_name,
1283        &snippet,
1284    )?;
1285    Ok(())
1286}
1287
1288/// Persists an enriched memory body (body-enrich, GAP-18).
1289///
1290/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1291/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1292fn persist_enriched_body(
1293    conn: &Connection,
1294    namespace: &str,
1295    memory_id: i64,
1296    memory_name: &str,
1297    new_body: &str,
1298    paths: &crate::paths::AppPaths,
1299    llm_backend: crate::cli::LlmBackendChoice,
1300) -> Result<(), AppError> {
1301    // Read current values for FTS sync
1302    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1303        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1304        rusqlite::params![memory_id],
1305        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1306    )?;
1307
1308    let memory_type: String = conn.query_row(
1309        "SELECT type FROM memories WHERE id=?1",
1310        rusqlite::params![memory_id],
1311        |r| r.get(0),
1312    )?;
1313
1314    let description: String = conn.query_row(
1315        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1316        rusqlite::params![memory_id],
1317        |r| r.get(0),
1318    )?;
1319
1320    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1321
1322    let new_memory = memories::NewMemory {
1323        namespace: namespace.to_string(),
1324        name: memory_name.to_string(),
1325        memory_type: memory_type.clone(),
1326        description: description.clone(),
1327        body: new_body.to_string(),
1328        body_hash,
1329        session_id: None,
1330        source: "agent".to_string(),
1331        metadata: serde_json::json!({
1332            "operation": "body-enrich",
1333            "orig_chars": old_body.chars().count(),
1334            "new_chars": new_body.chars().count(),
1335        }),
1336    };
1337
1338    // G29 audit: insert a new immutable version BEFORE the update so the
1339    // enriched body is reachable through `history --name <X>` and
1340    // `restore --version N` can roll back to the pre-enrich state.
1341    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1342    let version_metadata = serde_json::json!({
1343        "operation": "body-enrich",
1344        "orig_chars": old_body.chars().count(),
1345        "new_chars": new_body.chars().count(),
1346    })
1347    .to_string();
1348    crate::storage::versions::insert_version(
1349        conn,
1350        memory_id,
1351        next_version,
1352        memory_name,
1353        &memory_type,
1354        &description,
1355        new_body,
1356        &version_metadata,
1357        Some("enrich"),
1358        "edit",
1359    )?;
1360
1361    memories::update(conn, memory_id, &new_memory, None)?;
1362    memories::sync_fts_after_update(
1363        conn,
1364        memory_id,
1365        &old_name,
1366        &old_desc,
1367        &old_body,
1368        &new_memory.name,
1369        &new_memory.description,
1370        &new_memory.body,
1371    )?;
1372
1373    // Re-embed for recall accuracy
1374    if let Err(e) = reembed_memory_vector(
1375        conn,
1376        namespace,
1377        memory_id,
1378        memory_name,
1379        &memory_type,
1380        new_body,
1381        paths,
1382        llm_backend,
1383    ) {
1384        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1385    }
1386
1387    Ok(())
1388}
1389
1390// ---------------------------------------------------------------------------
1391// Main entry point
1392// ---------------------------------------------------------------------------
1393
1394// ---------------------------------------------------------------------------
1395// G20: mode-conditional flag validation
1396// ---------------------------------------------------------------------------
1397
1398/// True when a scalar value matches its declared default. Used to
1399/// distinguish "operator passed an explicit override" from "clap filled
1400/// the default" for flags with default_value_t.
1401fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1402    value == default
1403}
1404
1405/// G20: validate that flags for one LLM provider were not passed when
1406/// the operator selected a different provider. Flags silently discarded
1407/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1408/// DB work, so the operator gets an actionable error instead of a
1409/// surprise at runtime.
1410///
1411/// Detection rules:
1412/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1413/// - For scalar fields with default_value_t: value != default means explicit
1414/// - For boolean fields: true means explicit (default is false)
1415///
1416/// Mode-specific matrices:
1417/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1418/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1419fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1420    const DEFAULT_TIMEOUT: u64 = 300;
1421
1422    let mut conflicts: Vec<String> = Vec::new();
1423
1424    match args.mode {
1425        EnrichMode::ClaudeCode => {
1426            if args.codex_binary.is_some() {
1427                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1428            }
1429            if args.codex_model.is_some() {
1430                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1431            }
1432            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1433                conflicts.push(format!(
1434                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1435                    args.codex_timeout
1436                ));
1437            }
1438        }
1439        EnrichMode::Codex => {
1440            if args.claude_binary.is_some() {
1441                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1442            }
1443            if args.claude_model.is_some() {
1444                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1445            }
1446            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1447                conflicts.push(format!(
1448                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1449                    args.claude_timeout
1450                ));
1451            }
1452            if args.max_cost_usd.is_some() {
1453                conflicts.push(
1454                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1455                        .to_string(),
1456                );
1457            }
1458        }
1459    }
1460
1461    if !conflicts.is_empty() {
1462        return Err(AppError::Validation(format!(
1463            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1464            args.mode,
1465            conflicts.join("\n  - ")
1466        )));
1467    }
1468
1469    Ok(())
1470}
1471
1472// ---------------------------------------------------------------------------
1473
1474/// Main entry point for the `enrich` command.
1475pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1476    // G20: mode-conditional flag validation BEFORE any DB access.
1477    // Surfaces flags that the wrong mode would silently discard.
1478    validate_mode_conditional_flags_enrich(args)?;
1479    let started = Instant::now();
1480
1481    let paths = AppPaths::resolve(args.db.as_deref())?;
1482    ensure_db_ready(&paths)?;
1483    let conn = open_rw(&paths.db)?;
1484    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1485
1486    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1487    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1488    // on the same DB cannot co-exist, but concurrent enrich on different
1489    // databases works as expected. The force flag (--force) breaks a
1490    // stale lock from a previously crashed invocation.
1491    let wait_secs = args.wait_job_singleton;
1492    let force_flag = args.force_job_singleton;
1493    let _singleton = crate::lock::acquire_job_singleton(
1494        crate::lock::JobType::Enrich,
1495        &namespace,
1496        &paths.db,
1497        wait_secs,
1498        force_flag,
1499    )?;
1500
1501    // Validate provider binary upfront only for LLM-backed operations.
1502    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1503        None
1504    } else {
1505        Some(match args.mode {
1506            EnrichMode::ClaudeCode => {
1507                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1508                let version = super::claude_runner::validate_claude_version(&bin)?;
1509                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1510                emit_json(&PhaseEvent {
1511                    phase: "validate",
1512                    binary_path: bin.to_str(),
1513                    version: Some(&version),
1514                    items_total: None,
1515                    items_pending: None,
1516                    llm_parallelism: None,
1517                });
1518                bin
1519            }
1520            EnrichMode::Codex => {
1521                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1522                emit_json(&PhaseEvent {
1523                    phase: "validate",
1524                    binary_path: bin.to_str(),
1525                    version: None,
1526                    items_total: None,
1527                    items_pending: None,
1528                    llm_parallelism: None,
1529                });
1530                bin
1531            }
1532        })
1533    };
1534
1535    // G28-D: refuse to start when the system is saturated. This check
1536    // is BEFORE preflight so we never spend an OAuth turn on a host
1537    // that is already at the limit.
1538    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1539        let load = crate::system_load::load_average_one();
1540        let n = crate::system_load::ncpus();
1541        return Err(AppError::Validation(format!(
1542            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1543             pass --no-max-load-check to override (not recommended)"
1544        )));
1545    }
1546
1547    // G35: preflight probe — issue a single ping turn to verify the
1548    // provider is healthy before scanning N candidates. If the probe
1549    // fails with a rate-limit error, optionally fall back to a
1550    // different mode (typically codex) instead of failing the entire
1551    // batch. The probe itself consumes 1 OAuth turn, so it stays
1552    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1553    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1554    {
1555        let preflight_result = run_preflight_probe(args);
1556        match preflight_result {
1557            PreflightOutcome::Healthy => {
1558                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1559            }
1560            PreflightOutcome::RateLimited { reason, suggestion } => {
1561                if let Some(fallback) = args.fallback_mode.clone() {
1562                    if fallback != args.mode {
1563                        // G35 (v1.0.69): the mid-batch mode switch is
1564                        // intentionally NOT applied because it would
1565                        // desynchronise the per-item rate-limit wait
1566                        // state (rate-limited items in the worker are
1567                        // timed against the original provider). Instead
1568                        // we abort cleanly so the operator can re-invoke
1569                        // with `--mode {fallback:?}`. This guarantees no
1570                        // OAuth window is wasted and no partial state
1571                        // is left in the queue.
1572                        return Err(AppError::Validation(format!(
1573                            "preflight detected rate limit on {mode:?}: {reason}; \
1574                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1575                            mode = args.mode
1576                        )));
1577                    }
1578                    return Err(AppError::Validation(format!(
1579                        "preflight detected rate limit on {mode:?}: {reason}; \
1580                         --fallback-mode matches --mode, no recovery possible",
1581                        mode = args.mode
1582                    )));
1583                }
1584                return Err(AppError::Validation(format!(
1585                    "preflight detected rate limit on {mode:?}: {reason}; \
1586                     {suggestion}; pass --fallback-mode codex to recover",
1587                    mode = args.mode
1588                )));
1589            }
1590            PreflightOutcome::Error(e) => {
1591                return Err(e);
1592            }
1593        }
1594    }
1595
1596    // SCAN phase
1597    let scan_result = scan_operation(&conn, &namespace, args)?;
1598    let total = scan_result.len();
1599
1600    emit_json(&PhaseEvent {
1601        phase: "scan",
1602        binary_path: None,
1603        version: None,
1604        items_total: Some(total),
1605        items_pending: Some(total),
1606        llm_parallelism: Some(args.llm_parallelism),
1607    });
1608
1609    // Dry-run: emit preview events and summary without calling LLM
1610    if args.dry_run {
1611        for (idx, key) in scan_result.iter().enumerate() {
1612            emit_json(&ItemEvent {
1613                item: key,
1614                status: "preview",
1615                memory_id: None,
1616                entity_id: None,
1617                entities: None,
1618                rels: None,
1619                chars_before: None,
1620                chars_after: None,
1621                cost_usd: None,
1622                elapsed_ms: None,
1623                error: None,
1624                index: idx,
1625                total,
1626            });
1627        }
1628        emit_json(&EnrichSummary {
1629            summary: true,
1630            operation: format!("{:?}", args.operation),
1631            items_total: total,
1632            completed: 0,
1633            failed: 0,
1634            skipped: 0,
1635            cost_usd: 0.0,
1636            elapsed_ms: started.elapsed().as_millis() as u64,
1637        });
1638        return Ok(());
1639    }
1640
1641    // All operations in this enum have an execution path.
1642
1643    // Queue setup for resume/retry
1644    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1645
1646    if args.resume {
1647        let reset = queue_conn
1648            .execute(
1649                "UPDATE queue SET status='pending' WHERE status='processing'",
1650                [],
1651            )
1652            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1653        if reset > 0 {
1654            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1655        }
1656    }
1657
1658    if args.retry_failed {
1659        let count = queue_conn
1660            .execute(
1661                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1662                [],
1663            )
1664            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1665        tracing::info!(target: "enrich", count, "retrying failed items");
1666    }
1667
1668    if !args.resume && !args.retry_failed {
1669        queue_conn
1670            .execute("DELETE FROM queue", [])
1671            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1672    }
1673
1674    // Populate queue
1675    for (idx, key) in scan_result.iter().enumerate() {
1676        let item_type = match args.operation {
1677            EnrichOperation::EntityDescriptions => "entity",
1678            _ => "memory",
1679        };
1680        if let Err(e) = queue_conn.execute(
1681            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1682            rusqlite::params![key, item_type],
1683        ) {
1684            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1685        }
1686        let _ = idx; // suppress unused warning
1687    }
1688
1689    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1690    // Clamp enforces the range even if the caller bypasses clap validation.
1691    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1692    if parallelism > 1 {
1693        tracing::info!(
1694            target: "enrich",
1695            llm_parallelism = parallelism,
1696            "parallel LLM processing with bounded thread pool"
1697        );
1698    }
1699    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1700    // ceiling. The threshold and message depend on the LLM mode because
1701    // Claude Code spawns MCP children (G28-A) while Codex does not.
1702    if parallelism > 4 {
1703        match args.mode {
1704            EnrichMode::ClaudeCode => {
1705                tracing::warn!(
1706                    target: "enrich",
1707                    llm_parallelism = parallelism,
1708                    recommended_max = 4,
1709                    mode = "claude-code",
1710                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1711                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1712                     to cut MCP children (G28-A)"
1713                );
1714            }
1715            EnrichMode::Codex if parallelism > 16 => {
1716                tracing::warn!(
1717                    target: "enrich",
1718                    llm_parallelism = parallelism,
1719                    recommended_max = 16,
1720                    mode = "codex",
1721                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1722                     consider --llm-parallelism 8 for safer concurrency"
1723                );
1724            }
1725            EnrichMode::Codex => {
1726                // No warning: codex does not spawn MCP children and was
1727                // validated at parallelism 8 in production (1161 items,
1728                // 0 failures) per the 2026-06-04 session audit.
1729            }
1730        }
1731    }
1732
1733    let mut completed = 0usize;
1734    let mut failed = 0usize;
1735    let mut skipped = 0usize;
1736    let mut cost_total = 0.0f64;
1737    let mut oauth_detected = false;
1738    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1739    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1740    let enrich_started = std::time::Instant::now();
1741
1742    let provider_timeout = match args.mode {
1743        EnrichMode::ClaudeCode => args.claude_timeout,
1744        EnrichMode::Codex => args.codex_timeout,
1745    };
1746
1747    let provider_model: Option<&str> = match args.mode {
1748        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1749        EnrichMode::Codex => args.codex_model.as_deref(),
1750    };
1751
1752    // G19: when parallelism > 1, spawn bounded worker threads.
1753    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1754    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1755    if parallelism > 1 {
1756        let stdout_mu = parking_lot::Mutex::new(());
1757        let budget = args.max_cost_usd;
1758        let operation = args.operation.clone();
1759        let mode = args.mode.clone();
1760        let min_oc = args.min_output_chars;
1761        let max_oc = args.max_output_chars;
1762        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1763
1764        struct WorkerResult {
1765            completed: usize,
1766            failed: usize,
1767            skipped: usize,
1768            cost: f64,
1769            oauth: bool,
1770        }
1771
1772        let results: Vec<WorkerResult> = std::thread::scope(|s| {
1773            let handles: Vec<_> = (0..parallelism)
1774                .map(|worker_id| {
1775                    let stdout_mu = &stdout_mu;
1776                    let paths = &paths;
1777                    let namespace = &namespace;
1778                    let provider_binary = provider_binary.as_deref();
1779                    let operation = &operation;
1780                    let mode = &mode;
1781                    let prompt_tpl = prompt_tpl.as_deref();
1782                    s.spawn(move || {
1783                        let w_conn = match open_rw(&paths.db) {
1784                            Ok(c) => c,
1785                            Err(e) => {
1786                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1787                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1788                            }
1789                        };
1790                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1791                            Ok(c) => c,
1792                            Err(e) => {
1793                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1794                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1795                            }
1796                        };
1797                        let mut w_completed = 0usize;
1798                        let mut w_failed = 0usize;
1799                        let mut w_skipped = 0usize;
1800                        let mut w_cost = 0.0f64;
1801                        let mut w_oauth = false;
1802                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1803                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1804                        // G28-D: per-worker circuit breaker that aborts the
1805                        // loop after `circuit_breaker_threshold` consecutive
1806                        // HardFailure outcomes (transient/rate-limited errors
1807                        // do NOT count, so a recovering provider is not
1808                        // penalised).
1809                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1810                            args.circuit_breaker_threshold.max(1),
1811                            std::time::Duration::from_secs(60),
1812                        );
1813
1814                        loop {
1815                            if crate::shutdown_requested() {
1816                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1817                                break;
1818                            }
1819                            if let Some(b) = budget {
1820                                if !w_oauth && w_cost >= b {
1821                                    break;
1822                                }
1823                            }
1824                            let pending: Option<(i64, String, String)> = w_queue
1825                                .query_row(
1826                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1827                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1828                                     RETURNING id, item_key, item_type",
1829                                    [],
1830                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1831                                )
1832                                .ok();
1833                            let (queue_id, item_key, _item_type) = match pending {
1834                                Some(p) => p,
1835                                None => break,
1836                            };
1837                            let item_started = Instant::now();
1838                            let current_index = w_completed + w_failed + w_skipped;
1839
1840                            let call_result = match operation {
1841                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1842                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1843                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
1844                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
1845                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1846                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1847                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1848                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1849                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1850                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1851                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1852                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1853                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1854                            };
1855
1856                            match call_result {
1857                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1858                                    if is_oauth { w_oauth = true; }
1859                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1860                                    let _ = w_queue.execute(
1861                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1862                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1863                                    );
1864                                    w_completed += 1;
1865                                    if !is_oauth { w_cost += cost; }
1866                                    // G28-D: count success; resets breaker.
1867                                    let _ = w_breaker
1868                                        .record(crate::retry::AttemptOutcome::Success);
1869                                    let _guard = stdout_mu.lock();
1870                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1871                                }
1872                                Ok(EnrichItemResult::Skipped { reason }) => {
1873                                    w_skipped += 1;
1874                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1875                                    let _guard = stdout_mu.lock();
1876                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1877                                }
1878                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1879                                    // G29 Passo 4: worker mirror of the
1880                                    // serial path. Counted as a soft
1881                                    // skip so the queue surface shows
1882                                    // a quality issue rather than a
1883                                    // transport failure.
1884                                    w_skipped += 1;
1885                                    let reason = format!(
1886                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1887                                    );
1888                                    let _ = w_queue.execute(
1889                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1890                                        rusqlite::params![reason, queue_id],
1891                                    );
1892                                    let _guard = stdout_mu.lock();
1893                                    emit_json(&ItemEvent {
1894                                        item: &item_key,
1895                                        status: "preservation_failed",
1896                                        memory_id: None,
1897                                        entity_id: None,
1898                                        entities: None,
1899                                        rels: None,
1900                                        chars_before: Some(chars_before),
1901                                        chars_after: Some(chars_after),
1902                                        cost_usd: None,
1903                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1904                                        error: Some(reason),
1905                                        index: current_index,
1906                                        total,
1907                                    });
1908                                }
1909                                Err(e) => {
1910                                    let err_str = format!("{e}");
1911                                    if matches!(e, AppError::RateLimited { .. }) {
1912                                        if crate::retry::is_kill_switch_active() {
1913                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1914                                        } else if std::time::Instant::now() >= w_deadline {
1915                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1916                                        } else {
1917                                            let half = w_backoff / 2;
1918                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1919                                            let actual_wait = half + jitter;
1920                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1921                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1922                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1923                                            w_backoff = (w_backoff * 2).min(900);
1924                                            continue;
1925                                        }
1926                                    }
1927                                    w_failed += 1;
1928                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1929                                    let _guard = stdout_mu.lock();
1930                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1931                                    // G28-D: count hard failure against breaker.
1932                                    let breaker_opened = w_breaker
1933                                        .record(crate::retry::AttemptOutcome::HardFailure);
1934                                    if breaker_opened {
1935                                        tracing::error!(target: "enrich",
1936                                            consecutive_failures = w_breaker.consecutive_failures(),
1937                                            "circuit breaker opened — aborting worker"
1938                                        );
1939                                        break;
1940                                    }
1941                                }
1942                            }
1943                        }
1944                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1945                    })
1946                })
1947                .collect();
1948            handles
1949                .into_iter()
1950                .map(|h| {
1951                    h.join().unwrap_or(WorkerResult {
1952                        completed: 0,
1953                        failed: 0,
1954                        skipped: 0,
1955                        cost: 0.0,
1956                        oauth: false,
1957                    })
1958                })
1959                .collect()
1960        });
1961
1962        for r in &results {
1963            completed += r.completed;
1964            failed += r.failed;
1965            skipped += r.skipped;
1966            cost_total += r.cost;
1967            if r.oauth && !oauth_detected {
1968                oauth_detected = true;
1969            }
1970        }
1971    } else {
1972        // Serial path (parallelism == 1) — original loop
1973        loop {
1974            if crate::shutdown_requested() {
1975                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1976                break;
1977            }
1978
1979            // Budget check
1980            if let Some(budget) = args.max_cost_usd {
1981                if !oauth_detected && cost_total >= budget {
1982                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1983                    break;
1984                }
1985            }
1986
1987            // Dequeue next pending item
1988            let pending: Option<(i64, String, String)> = queue_conn
1989                .query_row(
1990                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1991                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1992                 RETURNING id, item_key, item_type",
1993                    [],
1994                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1995                )
1996                .ok();
1997
1998            let (queue_id, item_key, item_type) = match pending {
1999                Some(p) => p,
2000                None => break,
2001            };
2002
2003            let item_started = Instant::now();
2004            let current_index = completed + failed + skipped;
2005
2006            let call_result = match args.operation {
2007                EnrichOperation::MemoryBindings => call_memory_bindings(
2008                    &conn,
2009                    &namespace,
2010                    &item_key,
2011                    provider_binary
2012                        .as_deref()
2013                        .expect("provider binary required"),
2014                    provider_model,
2015                    provider_timeout,
2016                    &args.mode,
2017                ),
2018                EnrichOperation::EntityDescriptions => call_entity_description(
2019                    &conn,
2020                    &namespace,
2021                    &item_key,
2022                    provider_binary
2023                        .as_deref()
2024                        .expect("provider binary required"),
2025                    provider_model,
2026                    provider_timeout,
2027                    &args.mode,
2028                ),
2029                EnrichOperation::BodyEnrich => call_body_enrich(
2030                    &conn,
2031                    &namespace,
2032                    &item_key,
2033                    provider_binary
2034                        .as_deref()
2035                        .expect("provider binary required"),
2036                    provider_model,
2037                    provider_timeout,
2038                    &args.mode,
2039                    args.min_output_chars,
2040                    args.max_output_chars,
2041                    args.prompt_template.as_deref(),
2042                    args.preserve_threshold,
2043                    &paths,
2044                    llm_backend,
2045                ),
2046                EnrichOperation::ReEmbed => {
2047                    call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2048                }
2049                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2050                    &conn,
2051                    &namespace,
2052                    &item_key,
2053                    provider_binary
2054                        .as_deref()
2055                        .expect("provider binary required"),
2056                    provider_model,
2057                    provider_timeout,
2058                    &args.mode,
2059                ),
2060                EnrichOperation::RelationReclassify => call_relation_reclassify(
2061                    &conn,
2062                    &namespace,
2063                    &item_key,
2064                    provider_binary
2065                        .as_deref()
2066                        .expect("provider binary required"),
2067                    provider_model,
2068                    provider_timeout,
2069                    &args.mode,
2070                ),
2071                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2072                    call_entity_connect(
2073                        &conn,
2074                        &namespace,
2075                        &item_key,
2076                        provider_binary
2077                            .as_deref()
2078                            .expect("provider binary required"),
2079                        provider_model,
2080                        provider_timeout,
2081                        &args.mode,
2082                    )
2083                }
2084                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2085                    &conn,
2086                    &namespace,
2087                    &item_key,
2088                    provider_binary
2089                        .as_deref()
2090                        .expect("provider binary required"),
2091                    provider_model,
2092                    provider_timeout,
2093                    &args.mode,
2094                ),
2095                EnrichOperation::DescriptionEnrich => call_description_enrich(
2096                    &conn,
2097                    &namespace,
2098                    &item_key,
2099                    provider_binary
2100                        .as_deref()
2101                        .expect("provider binary required"),
2102                    provider_model,
2103                    provider_timeout,
2104                    &args.mode,
2105                ),
2106                EnrichOperation::DomainClassify => call_domain_classify(
2107                    &conn,
2108                    &namespace,
2109                    &item_key,
2110                    provider_binary
2111                        .as_deref()
2112                        .expect("provider binary required"),
2113                    provider_model,
2114                    provider_timeout,
2115                    &args.mode,
2116                ),
2117                EnrichOperation::GraphAudit => call_graph_audit(
2118                    &conn,
2119                    &namespace,
2120                    &item_key,
2121                    provider_binary
2122                        .as_deref()
2123                        .expect("provider binary required"),
2124                    provider_model,
2125                    provider_timeout,
2126                    &args.mode,
2127                ),
2128                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2129                    &conn,
2130                    &namespace,
2131                    &item_key,
2132                    provider_binary
2133                        .as_deref()
2134                        .expect("provider binary required"),
2135                    provider_model,
2136                    provider_timeout,
2137                    &args.mode,
2138                ),
2139                EnrichOperation::BodyExtract => call_body_extract(
2140                    &conn,
2141                    &namespace,
2142                    &item_key,
2143                    provider_binary
2144                        .as_deref()
2145                        .expect("provider binary required"),
2146                    provider_model,
2147                    provider_timeout,
2148                    &args.mode,
2149                ),
2150            };
2151
2152            match call_result {
2153                Ok(EnrichItemResult::Done {
2154                    memory_id,
2155                    entity_id,
2156                    entities,
2157                    rels,
2158                    chars_before,
2159                    chars_after,
2160                    cost,
2161                    is_oauth,
2162                }) => {
2163                    if is_oauth && !oauth_detected {
2164                        oauth_detected = true;
2165                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2166                    }
2167                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2168
2169                    // Persist depends on the operation
2170                    let persist_err: Option<String> = match args.operation {
2171                        EnrichOperation::MemoryBindings => {
2172                            // Bindings already persisted inside call_memory_bindings
2173                            None
2174                        }
2175                        EnrichOperation::EntityDescriptions => {
2176                            // Description already persisted inside call_entity_description
2177                            None
2178                        }
2179                        EnrichOperation::BodyEnrich => {
2180                            // Body already persisted inside call_body_enrich
2181                            None
2182                        }
2183                        _ => {
2184                            // All G27 operations persist inside their call_* function
2185                            None
2186                        }
2187                    };
2188
2189                    if let Err(e) = queue_conn.execute(
2190                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2191                    rusqlite::params![
2192                        memory_id,
2193                        entity_id,
2194                        entities as i64,
2195                        rels as i64,
2196                        cost,
2197                        item_started.elapsed().as_millis() as i64,
2198                        queue_id
2199                    ],
2200                ) {
2201                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2202                    }
2203
2204                    if persist_err.is_none() {
2205                        completed += 1;
2206                        if !is_oauth {
2207                            cost_total += cost;
2208                        }
2209                        emit_json(&ItemEvent {
2210                            item: &item_key,
2211                            status: "done",
2212                            memory_id,
2213                            entity_id,
2214                            entities: Some(entities),
2215                            rels: Some(rels),
2216                            chars_before,
2217                            chars_after,
2218                            cost_usd: if is_oauth { None } else { Some(cost) },
2219                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2220                            error: None,
2221                            index: current_index,
2222                            total,
2223                        });
2224                    } else {
2225                        failed += 1;
2226                        emit_json(&ItemEvent {
2227                            item: &item_key,
2228                            status: "failed",
2229                            memory_id: None,
2230                            entity_id: None,
2231                            entities: None,
2232                            rels: None,
2233                            chars_before: None,
2234                            chars_after: None,
2235                            cost_usd: None,
2236                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2237                            error: persist_err,
2238                            index: current_index,
2239                            total,
2240                        });
2241                    }
2242                }
2243                Ok(EnrichItemResult::Skipped { reason }) => {
2244                    skipped += 1;
2245                    if let Err(e) = queue_conn.execute(
2246                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2247                    rusqlite::params![reason, queue_id],
2248                ) {
2249                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2250                    }
2251                    emit_json(&ItemEvent {
2252                        item: &item_key,
2253                        status: "skipped",
2254                        memory_id: None,
2255                        entity_id: None,
2256                        entities: None,
2257                        rels: None,
2258                        chars_before: None,
2259                        chars_after: None,
2260                        cost_usd: None,
2261                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2262                        error: None,
2263                        index: current_index,
2264                        total,
2265                    });
2266                }
2267                Ok(EnrichItemResult::PreservationFailed {
2268                    score,
2269                    threshold,
2270                    chars_before,
2271                    chars_after,
2272                }) => {
2273                    // G29 Passo 4: the LLM rewrite diverged too far from
2274                    // the original body. Count as a soft failure (not
2275                    // `failed`) so the queue surfaces it as a quality
2276                    // issue, not a transport error. The reason is
2277                    // structured so the operator can audit why a body
2278                    // was rejected.
2279                    skipped += 1;
2280                    let reason = format!(
2281                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2282                    );
2283                    if let Err(qe) = queue_conn.execute(
2284                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2285                        rusqlite::params![reason, queue_id],
2286                    ) {
2287                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2288                    }
2289                    emit_json(&ItemEvent {
2290                        item: &item_key,
2291                        status: "preservation_failed",
2292                        memory_id: None,
2293                        entity_id: None,
2294                        entities: None,
2295                        rels: None,
2296                        chars_before: Some(chars_before),
2297                        chars_after: Some(chars_after),
2298                        cost_usd: None,
2299                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2300                        error: Some(reason),
2301                        index: current_index,
2302                        total,
2303                    });
2304                }
2305                Err(e) => {
2306                    let err_str = format!("{e}");
2307                    if matches!(e, AppError::RateLimited { .. }) {
2308                        if crate::retry::is_kill_switch_active() {
2309                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2310                        } else if std::time::Instant::now() >= rate_limit_deadline {
2311                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2312                        } else {
2313                            let half = backoff_secs / 2;
2314                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2315                            let actual_wait = half + jitter;
2316                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2317                            if let Err(qe) = queue_conn.execute(
2318                                "UPDATE queue SET status='pending' WHERE id=?1",
2319                                rusqlite::params![queue_id],
2320                            ) {
2321                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2322                            }
2323                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2324                            backoff_secs = (backoff_secs * 2).min(900);
2325                            continue;
2326                        }
2327                    }
2328
2329                    failed += 1;
2330                    if let Err(qe) = queue_conn.execute(
2331                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2332                    rusqlite::params![err_str, queue_id],
2333                ) {
2334                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2335                    }
2336                    emit_json(&ItemEvent {
2337                        item: &item_key,
2338                        status: "failed",
2339                        memory_id: None,
2340                        entity_id: None,
2341                        entities: None,
2342                        rels: None,
2343                        chars_before: None,
2344                        chars_after: None,
2345                        cost_usd: None,
2346                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2347                        error: Some(err_str),
2348                        index: current_index,
2349                        total,
2350                    });
2351                }
2352            }
2353
2354            let _ = item_type; // used via queue schema only
2355        }
2356    } // end else (serial path)
2357
2358    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2359    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2360
2361    emit_json(&EnrichSummary {
2362        summary: true,
2363        operation: format!("{:?}", args.operation),
2364        items_total: total,
2365        completed,
2366        failed,
2367        skipped,
2368        cost_usd: cost_total,
2369        elapsed_ms: started.elapsed().as_millis() as u64,
2370    });
2371
2372    if failed == 0 {
2373        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2374    }
2375
2376    Ok(())
2377}
2378
2379// ---------------------------------------------------------------------------
2380// Internal result type for a single item call
2381// ---------------------------------------------------------------------------
2382
2383enum EnrichItemResult {
2384    Done {
2385        memory_id: Option<i64>,
2386        entity_id: Option<i64>,
2387        entities: usize,
2388        rels: usize,
2389        chars_before: Option<usize>,
2390        chars_after: Option<usize>,
2391        cost: f64,
2392        is_oauth: bool,
2393    },
2394    Skipped {
2395        reason: String,
2396    },
2397    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2398    /// body beyond the configured `--preserve-threshold` and was rejected
2399    /// before persistence. The trigram-Jaccard score and threshold are
2400    /// emitted in the NDJSON stream for operator audit.
2401    PreservationFailed {
2402        score: f64,
2403        threshold: f64,
2404        chars_before: usize,
2405        chars_after: usize,
2406    },
2407}
2408
2409// ---------------------------------------------------------------------------
2410// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2411// ---------------------------------------------------------------------------
2412
2413fn call_memory_bindings(
2414    conn: &Connection,
2415    namespace: &str,
2416    memory_name: &str,
2417    binary: &Path,
2418    model: Option<&str>,
2419    timeout: u64,
2420    mode: &EnrichMode,
2421) -> Result<EnrichItemResult, AppError> {
2422    // Look up the memory
2423    let (memory_id, body): (i64, String) = conn.query_row(
2424        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2425        rusqlite::params![namespace, memory_name],
2426        |r| Ok((r.get(0)?, r.get(1)?)),
2427    ).map_err(|e| match e {
2428        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2429        other => AppError::Database(other),
2430    })?;
2431
2432    if body.trim().is_empty() {
2433        return Ok(EnrichItemResult::Skipped {
2434            reason: "body is empty".to_string(),
2435        });
2436    }
2437
2438    let (value, cost, is_oauth) = match mode {
2439        EnrichMode::ClaudeCode => call_claude(
2440            binary,
2441            BINDINGS_PROMPT,
2442            BINDINGS_SCHEMA,
2443            &body,
2444            model,
2445            timeout,
2446        )?,
2447        EnrichMode::Codex => call_codex(
2448            binary,
2449            BINDINGS_PROMPT,
2450            BINDINGS_SCHEMA,
2451            &body,
2452            model,
2453            timeout,
2454        )?,
2455    };
2456
2457    let empty_arr = serde_json::Value::Array(vec![]);
2458    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2459    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2460
2461    let (ent_count, rel_count) =
2462        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2463
2464    Ok(EnrichItemResult::Done {
2465        memory_id: Some(memory_id),
2466        entity_id: None,
2467        entities: ent_count,
2468        rels: rel_count,
2469        chars_before: None,
2470        chars_after: None,
2471        cost,
2472        is_oauth,
2473    })
2474}
2475
2476fn call_entity_description(
2477    conn: &Connection,
2478    namespace: &str,
2479    entity_name: &str,
2480    binary: &Path,
2481    model: Option<&str>,
2482    timeout: u64,
2483    mode: &EnrichMode,
2484) -> Result<EnrichItemResult, AppError> {
2485    let (entity_id, entity_type): (i64, String) = conn
2486        .query_row(
2487            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2488            rusqlite::params![namespace, entity_name],
2489            |r| Ok((r.get(0)?, r.get(1)?)),
2490        )
2491        .map_err(|e| match e {
2492            rusqlite::Error::QueryReturnedNoRows => {
2493                AppError::NotFound(format!("entity '{entity_name}' not found"))
2494            }
2495            other => AppError::Database(other),
2496        })?;
2497
2498    let prompt = format!(
2499        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2500    );
2501
2502    let (value, cost, is_oauth) = match mode {
2503        EnrichMode::ClaudeCode => call_claude(
2504            binary,
2505            &prompt,
2506            ENTITY_DESCRIPTION_SCHEMA,
2507            "",
2508            model,
2509            timeout,
2510        )?,
2511        EnrichMode::Codex => call_codex(
2512            binary,
2513            &prompt,
2514            ENTITY_DESCRIPTION_SCHEMA,
2515            "",
2516            model,
2517            timeout,
2518        )?,
2519    };
2520
2521    let description = value
2522        .get("description")
2523        .and_then(|v| v.as_str())
2524        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2525
2526    persist_entity_description(conn, entity_id, description)?;
2527
2528    Ok(EnrichItemResult::Done {
2529        memory_id: None,
2530        entity_id: Some(entity_id),
2531        entities: 0,
2532        rels: 0,
2533        chars_before: None,
2534        chars_after: None,
2535        cost,
2536        is_oauth,
2537    })
2538}
2539
2540#[allow(clippy::too_many_arguments)]
2541fn call_body_enrich(
2542    conn: &Connection,
2543    namespace: &str,
2544    memory_name: &str,
2545    binary: &Path,
2546    model: Option<&str>,
2547    timeout: u64,
2548    mode: &EnrichMode,
2549    min_output_chars: usize,
2550    max_output_chars: usize,
2551    prompt_template: Option<&Path>,
2552    preserve_threshold: f64,
2553    paths: &crate::paths::AppPaths,
2554    llm_backend: crate::cli::LlmBackendChoice,
2555) -> Result<EnrichItemResult, AppError> {
2556    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2557        .query_row(
2558            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2559         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2560            rusqlite::params![namespace, memory_name],
2561            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2562        )
2563        .map_err(|e| match e {
2564            rusqlite::Error::QueryReturnedNoRows => {
2565                AppError::NotFound(format!("memory '{memory_name}' not found"))
2566            }
2567            other => AppError::Database(other),
2568        })?;
2569
2570    let chars_before = body.chars().count();
2571
2572    // G26: gather graph context for contextualized enrichment
2573    let linked_entities: Vec<String> = {
2574        let mut stmt = conn.prepare_cached(
2575            "SELECT e.name FROM memory_entities me \
2576             JOIN entities e ON e.id = me.entity_id \
2577             WHERE me.memory_id = ?1 LIMIT 10",
2578        )?;
2579        let result: Vec<String> = stmt
2580            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2581            .filter_map(|r| r.ok())
2582            .collect();
2583        drop(stmt);
2584        result
2585    };
2586
2587    // Load custom prompt template if provided
2588    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2589        let file_size = std::fs::metadata(tmpl_path)
2590            .map_err(|e| {
2591                AppError::Io(std::io::Error::new(
2592                    e.kind(),
2593                    format!("failed to stat prompt template: {e}"),
2594                ))
2595            })?
2596            .len();
2597        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2598            return Err(AppError::LimitExceeded(
2599                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2600            ));
2601        }
2602        std::fs::read_to_string(tmpl_path).map_err(|e| {
2603            AppError::Io(std::io::Error::new(
2604                e.kind(),
2605                format!("failed to read prompt template: {e}"),
2606            ))
2607        })?
2608    } else {
2609        BODY_ENRICH_PROMPT_PREFIX.to_string()
2610    };
2611
2612    // G26: build contextualized prompt with graph data
2613    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2614        let mut ctx = String::new();
2615        ctx.push_str(&format!(
2616            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2617        ));
2618        if !description.is_empty() {
2619            ctx.push_str(&format!("- Description: {description}\n"));
2620        }
2621        ctx.push_str(&format!("- Domain: {namespace}\n"));
2622        if !linked_entities.is_empty() {
2623            ctx.push_str(&format!(
2624                "- Linked entities: {}\n",
2625                linked_entities.join(", ")
2626            ));
2627        }
2628        ctx
2629    } else {
2630        String::new()
2631    };
2632
2633    let prompt = format!(
2634        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2635    );
2636
2637    // The body schema uses a free-form enriched_body field
2638    let (value, cost, is_oauth) = match mode {
2639        EnrichMode::ClaudeCode => {
2640            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2641        }
2642        EnrichMode::Codex => {
2643            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2644        }
2645    };
2646
2647    let enriched_body = value
2648        .get("enriched_body")
2649        .and_then(|v| v.as_str())
2650        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2651
2652    let chars_after = enriched_body.chars().count();
2653
2654    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2655    // a trigram-Jaccard similarity between the original body and the
2656    // LLM-rewritten body. When the score falls below
2657    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2658    // rewrite as a likely hallucination. The result is recorded in the
2659    // NDJSON stream so operators can audit what the LLM tried to do.
2660    let threshold = preserve_threshold;
2661    let verdict =
2662        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2663    if !verdict.is_accepted() {
2664        return Ok(EnrichItemResult::PreservationFailed {
2665            score: match verdict {
2666                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2667                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2668                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2669            },
2670            threshold,
2671            chars_before,
2672            chars_after,
2673        });
2674    }
2675
2676    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2677    // compare the hash of the original body against the hash of the enriched
2678    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2679    // body (rare but possible) — treat as `Skipped` so re-running the batch
2680    // is safe and the queue does not get re-persisted entries.
2681    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2682    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2683    if old_hash == new_hash {
2684        return Ok(EnrichItemResult::Skipped {
2685            reason: format!(
2686                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2687            ),
2688        });
2689    }
2690
2691    // Only persist if the enriched body is genuinely longer
2692    if chars_after <= chars_before {
2693        return Ok(EnrichItemResult::Skipped {
2694            reason: format!(
2695                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2696            ),
2697        });
2698    }
2699
2700    persist_enriched_body(
2701        conn,
2702        namespace,
2703        memory_id,
2704        memory_name,
2705        enriched_body,
2706        paths,
2707        llm_backend,
2708    )?;
2709
2710    Ok(EnrichItemResult::Done {
2711        memory_id: Some(memory_id),
2712        entity_id: None,
2713        entities: 0,
2714        rels: 0,
2715        chars_before: Some(chars_before),
2716        chars_after: Some(chars_after),
2717        cost,
2718        is_oauth,
2719    })
2720}
2721
2722fn call_reembed(
2723    conn: &Connection,
2724    namespace: &str,
2725    memory_name: &str,
2726    paths: &crate::paths::AppPaths,
2727    llm_backend: crate::cli::LlmBackendChoice,
2728) -> Result<EnrichItemResult, AppError> {
2729    let (memory_id, body, memory_type): (i64, String, String) = conn
2730        .query_row(
2731            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2732             FROM memories
2733             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2734            rusqlite::params![namespace, memory_name],
2735            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2736        )
2737        .map_err(|e| match e {
2738            rusqlite::Error::QueryReturnedNoRows => {
2739                AppError::NotFound(format!("memory '{memory_name}' not found"))
2740            }
2741            other => AppError::Database(other),
2742        })?;
2743
2744    if body.trim().is_empty() {
2745        return Ok(EnrichItemResult::Skipped {
2746            reason: "body is empty".to_string(),
2747        });
2748    }
2749
2750    reembed_memory_vector(
2751        conn,
2752        namespace,
2753        memory_id,
2754        memory_name,
2755        &memory_type,
2756        &body,
2757        paths,
2758        llm_backend,
2759    )?;
2760
2761    Ok(EnrichItemResult::Done {
2762        memory_id: Some(memory_id),
2763        entity_id: None,
2764        entities: 0,
2765        rels: 0,
2766        chars_before: Some(body.chars().count()),
2767        chars_after: Some(body.chars().count()),
2768        cost: 0.0,
2769        is_oauth: true,
2770    })
2771}
2772
2773// ---------------------------------------------------------------------------
2774// Scan dispatcher — maps operation to scan query result (item keys)
2775// ---------------------------------------------------------------------------
2776
2777fn scan_operation(
2778    conn: &Connection,
2779    namespace: &str,
2780    args: &EnrichArgs,
2781) -> Result<Vec<String>, AppError> {
2782    // G37: resolve --names + --names-file once and apply to every scan path.
2783    let name_filter = resolve_name_filter(args)?;
2784    match args.operation {
2785        EnrichOperation::MemoryBindings => {
2786            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2787            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2788        }
2789        EnrichOperation::EntityDescriptions => {
2790            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2791            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2792        }
2793        EnrichOperation::BodyEnrich => {
2794            let rows =
2795                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2796            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2797        }
2798        EnrichOperation::ReEmbed => {
2799            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2800            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2801        }
2802        EnrichOperation::WeightCalibrate => {
2803            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2804            Ok(rows
2805                .into_iter()
2806                .map(|(id, _, _, _, _)| id.to_string())
2807                .collect())
2808        }
2809        EnrichOperation::RelationReclassify => {
2810            let rows = scan_generic_relations(conn, namespace, args.limit)?;
2811            Ok(rows
2812                .into_iter()
2813                .map(|(id, _, _, _)| id.to_string())
2814                .collect())
2815        }
2816        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2817            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2818            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2819        }
2820        EnrichOperation::EntityTypeValidate => {
2821            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2822            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2823        }
2824        EnrichOperation::DescriptionEnrich => {
2825            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2826            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2827        }
2828        EnrichOperation::DomainClassify
2829        | EnrichOperation::GraphAudit
2830        | EnrichOperation::DeepResearchSynth
2831        | EnrichOperation::BodyExtract => {
2832            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2833            let sql = format!(
2834                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2835            );
2836            let mut stmt = conn.prepare(&sql)?;
2837            let names = stmt
2838                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2839                .collect::<Result<Vec<_>, _>>()?;
2840            Ok(names)
2841        }
2842    }
2843}
2844
2845// ---------------------------------------------------------------------------
2846// Codex stub provider
2847// ---------------------------------------------------------------------------
2848
2849/// Locates the Codex CLI binary.
2850fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2851    if let Some(p) = explicit {
2852        if p.exists() {
2853            return Ok(p.to_path_buf());
2854        }
2855        return Err(AppError::Validation(format!(
2856            "Codex binary not found at explicit path: {}",
2857            p.display()
2858        )));
2859    }
2860
2861    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2862        let p = PathBuf::from(&env_path);
2863        if p.exists() {
2864            return Ok(p);
2865        }
2866    }
2867
2868    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2869    if let Some(path_var) = std::env::var_os("PATH") {
2870        for dir in std::env::split_paths(&path_var) {
2871            let candidate = dir.join(name);
2872            if candidate.exists() {
2873                return Ok(crate::extract::llm_embedding::resolve_real_binary(
2874                    &candidate,
2875                ));
2876            }
2877        }
2878    }
2879
2880    Err(AppError::Validation(
2881        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2882    ))
2883}
2884
2885/// G27: Calibrate weight of a single relationship via LLM.
2886fn call_weight_calibrate(
2887    conn: &Connection,
2888    _namespace: &str,
2889    item_key: &str,
2890    binary: &Path,
2891    model: Option<&str>,
2892    timeout: u64,
2893    mode: &EnrichMode,
2894) -> Result<EnrichItemResult, AppError> {
2895    let rel_id: i64 = item_key
2896        .parse()
2897        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2898    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2899        .query_row(
2900            "SELECT e1.name, e2.name, r.relation, r.weight \
2901             FROM relationships r \
2902             JOIN entities e1 ON e1.id = r.source_id \
2903             JOIN entities e2 ON e2.id = r.target_id \
2904             WHERE r.id = ?1",
2905            rusqlite::params![rel_id],
2906            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2907        )
2908        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2909
2910    let input_text = format!(
2911        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2912    );
2913    let (value, cost, is_oauth) = match mode {
2914        EnrichMode::ClaudeCode => call_claude(
2915            binary,
2916            WEIGHT_CALIBRATE_PROMPT,
2917            WEIGHT_CALIBRATE_SCHEMA,
2918            &input_text,
2919            model,
2920            timeout,
2921        )?,
2922        EnrichMode::Codex => call_codex(
2923            binary,
2924            WEIGHT_CALIBRATE_PROMPT,
2925            WEIGHT_CALIBRATE_SCHEMA,
2926            &input_text,
2927            model,
2928            timeout,
2929        )?,
2930    };
2931
2932    let calibrated = value
2933        .get("calibrated_weight")
2934        .and_then(|v| v.as_f64())
2935        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2936
2937    conn.execute(
2938        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2939        rusqlite::params![calibrated, rel_id],
2940    )?;
2941
2942    Ok(EnrichItemResult::Done {
2943        memory_id: None,
2944        entity_id: None,
2945        entities: 0,
2946        rels: 1,
2947        chars_before: None,
2948        chars_after: None,
2949        cost,
2950        is_oauth,
2951    })
2952}
2953
2954/// G27: Reclassify a generic relationship type via LLM.
2955fn call_relation_reclassify(
2956    conn: &Connection,
2957    _namespace: &str,
2958    item_key: &str,
2959    binary: &Path,
2960    model: Option<&str>,
2961    timeout: u64,
2962    mode: &EnrichMode,
2963) -> Result<EnrichItemResult, AppError> {
2964    let rel_id: i64 = item_key
2965        .parse()
2966        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2967    let (source_name, target_name, current_relation): (String, String, String) = conn
2968        .query_row(
2969            "SELECT e1.name, e2.name, r.relation \
2970             FROM relationships r \
2971             JOIN entities e1 ON e1.id = r.source_id \
2972             JOIN entities e2 ON e2.id = r.target_id \
2973             WHERE r.id = ?1",
2974            rusqlite::params![rel_id],
2975            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2976        )
2977        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2978
2979    let input_text = format!(
2980        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2981    );
2982    let (value, cost, is_oauth) = match mode {
2983        EnrichMode::ClaudeCode => call_claude(
2984            binary,
2985            RELATION_RECLASSIFY_PROMPT,
2986            RELATION_RECLASSIFY_SCHEMA,
2987            &input_text,
2988            model,
2989            timeout,
2990        )?,
2991        EnrichMode::Codex => call_codex(
2992            binary,
2993            RELATION_RECLASSIFY_PROMPT,
2994            RELATION_RECLASSIFY_SCHEMA,
2995            &input_text,
2996            model,
2997            timeout,
2998        )?,
2999    };
3000
3001    let new_relation = value
3002        .get("relation")
3003        .and_then(|v| v.as_str())
3004        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3005    let new_strength = value
3006        .get("strength")
3007        .and_then(|v| v.as_f64())
3008        .unwrap_or(0.5);
3009
3010    conn.execute(
3011        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3012        rusqlite::params![new_relation, new_strength, rel_id],
3013    )?;
3014
3015    Ok(EnrichItemResult::Done {
3016        memory_id: None,
3017        entity_id: None,
3018        entities: 0,
3019        rels: 1,
3020        chars_before: None,
3021        chars_after: None,
3022        cost,
3023        is_oauth,
3024    })
3025}
3026
3027/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3028fn call_entity_connect(
3029    conn: &Connection,
3030    namespace: &str,
3031    item_key: &str,
3032    binary: &Path,
3033    model: Option<&str>,
3034    timeout: u64,
3035    mode: &EnrichMode,
3036) -> Result<EnrichItemResult, AppError> {
3037    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3038    let (e1_id, e1_name, e2_id, e2_name) =
3039        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3040            Some(p) => p,
3041            None => {
3042                return Ok(EnrichItemResult::Skipped {
3043                    reason: "pair no longer isolated".into(),
3044                })
3045            }
3046        };
3047    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3048    let (value, cost, is_oauth) = match mode {
3049        EnrichMode::ClaudeCode => call_claude(
3050            binary,
3051            ENTITY_CONNECT_PROMPT,
3052            ENTITY_CONNECT_SCHEMA,
3053            &input_text,
3054            model,
3055            timeout,
3056        )?,
3057        EnrichMode::Codex => call_codex(
3058            binary,
3059            ENTITY_CONNECT_PROMPT,
3060            ENTITY_CONNECT_SCHEMA,
3061            &input_text,
3062            model,
3063            timeout,
3064        )?,
3065    };
3066    let relation = value
3067        .get("relation")
3068        .and_then(|v| v.as_str())
3069        .unwrap_or("none");
3070    if relation == "none" {
3071        return Ok(EnrichItemResult::Skipped {
3072            reason: "LLM determined no relationship".into(),
3073        });
3074    }
3075    let strength = value
3076        .get("strength")
3077        .and_then(|v| v.as_f64())
3078        .unwrap_or(0.5);
3079    conn.execute(
3080        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3081        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3082    )?;
3083    Ok(EnrichItemResult::Done {
3084        memory_id: None,
3085        entity_id: None,
3086        entities: 0,
3087        rels: 1,
3088        chars_before: None,
3089        chars_after: None,
3090        cost,
3091        is_oauth,
3092    })
3093}
3094
3095/// G27 P2: Validate entity type assignment via LLM.
3096fn call_entity_type_validate(
3097    conn: &Connection,
3098    _namespace: &str,
3099    item_key: &str,
3100    binary: &Path,
3101    model: Option<&str>,
3102    timeout: u64,
3103    mode: &EnrichMode,
3104) -> Result<EnrichItemResult, AppError> {
3105    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3106        .query_row(
3107            "SELECT id, name, type FROM entities WHERE name = ?1",
3108            rusqlite::params![item_key],
3109            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3110        )
3111        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3112    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3113    let (value, cost, is_oauth) = match mode {
3114        EnrichMode::ClaudeCode => call_claude(
3115            binary,
3116            ENTITY_TYPE_VALIDATE_PROMPT,
3117            ENTITY_TYPE_VALIDATE_SCHEMA,
3118            &input_text,
3119            model,
3120            timeout,
3121        )?,
3122        EnrichMode::Codex => call_codex(
3123            binary,
3124            ENTITY_TYPE_VALIDATE_PROMPT,
3125            ENTITY_TYPE_VALIDATE_SCHEMA,
3126            &input_text,
3127            model,
3128            timeout,
3129        )?,
3130    };
3131    let validated_type = value
3132        .get("validated_type")
3133        .and_then(|v| v.as_str())
3134        .unwrap_or(&ent_type);
3135    let was_correct = value
3136        .get("was_correct")
3137        .and_then(|v| v.as_bool())
3138        .unwrap_or(true);
3139    if !was_correct {
3140        conn.execute(
3141            "UPDATE entities SET type = ?1 WHERE id = ?2",
3142            rusqlite::params![validated_type, ent_id],
3143        )?;
3144    }
3145    Ok(EnrichItemResult::Done {
3146        memory_id: None,
3147        entity_id: Some(ent_id),
3148        entities: 1,
3149        rels: 0,
3150        chars_before: None,
3151        chars_after: None,
3152        cost,
3153        is_oauth,
3154    })
3155}
3156
3157/// G27 P2: Enrich generic memory description via LLM.
3158fn call_description_enrich(
3159    conn: &Connection,
3160    _namespace: &str,
3161    item_key: &str,
3162    binary: &Path,
3163    model: Option<&str>,
3164    timeout: u64,
3165    mode: &EnrichMode,
3166) -> Result<EnrichItemResult, AppError> {
3167    let (mem_id, body, old_desc): (i64, String, String) = conn
3168        .query_row(
3169            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3170            rusqlite::params![item_key],
3171            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3172        )
3173        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3174    let snippet: String = body.chars().take(500).collect();
3175    let input_text = format!(
3176        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3177    );
3178    let (value, cost, is_oauth) = match mode {
3179        EnrichMode::ClaudeCode => call_claude(
3180            binary,
3181            DESCRIPTION_ENRICH_PROMPT,
3182            DESCRIPTION_ENRICH_SCHEMA,
3183            &input_text,
3184            model,
3185            timeout,
3186        )?,
3187        EnrichMode::Codex => call_codex(
3188            binary,
3189            DESCRIPTION_ENRICH_PROMPT,
3190            DESCRIPTION_ENRICH_SCHEMA,
3191            &input_text,
3192            model,
3193            timeout,
3194        )?,
3195    };
3196    let new_desc = value
3197        .get("description")
3198        .and_then(|v| v.as_str())
3199        .unwrap_or(&old_desc);
3200    conn.execute(
3201        "UPDATE memories SET description = ?1 WHERE id = ?2",
3202        rusqlite::params![new_desc, mem_id],
3203    )?;
3204    Ok(EnrichItemResult::Done {
3205        memory_id: Some(mem_id),
3206        entity_id: None,
3207        entities: 0,
3208        rels: 0,
3209        chars_before: Some(old_desc.len()),
3210        chars_after: Some(new_desc.len()),
3211        cost,
3212        is_oauth,
3213    })
3214}
3215
3216/// G27 P2: Classify memory into domain category via LLM.
3217fn call_domain_classify(
3218    conn: &Connection,
3219    _namespace: &str,
3220    item_key: &str,
3221    binary: &Path,
3222    model: Option<&str>,
3223    timeout: u64,
3224    mode: &EnrichMode,
3225) -> Result<EnrichItemResult, AppError> {
3226    let (mem_id, body, desc): (i64, String, String) = conn
3227        .query_row(
3228            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3229            rusqlite::params![item_key],
3230            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3231        )
3232        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3233    let snippet: String = body.chars().take(500).collect();
3234    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3235    let (value, cost, is_oauth) = match mode {
3236        EnrichMode::ClaudeCode => call_claude(
3237            binary,
3238            DOMAIN_CLASSIFY_PROMPT,
3239            DOMAIN_CLASSIFY_SCHEMA,
3240            &input_text,
3241            model,
3242            timeout,
3243        )?,
3244        EnrichMode::Codex => call_codex(
3245            binary,
3246            DOMAIN_CLASSIFY_PROMPT,
3247            DOMAIN_CLASSIFY_SCHEMA,
3248            &input_text,
3249            model,
3250            timeout,
3251        )?,
3252    };
3253    let domain = value
3254        .get("domain")
3255        .and_then(|v| v.as_str())
3256        .unwrap_or("uncategorized");
3257    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3258    conn.execute(
3259        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3260        rusqlite::params![metadata, mem_id],
3261    )?;
3262    Ok(EnrichItemResult::Done {
3263        memory_id: Some(mem_id),
3264        entity_id: None,
3265        entities: 0,
3266        rels: 0,
3267        chars_before: None,
3268        chars_after: None,
3269        cost,
3270        is_oauth,
3271    })
3272}
3273
3274/// G27 P2: Audit memory graph quality via LLM.
3275fn call_graph_audit(
3276    conn: &Connection,
3277    _namespace: &str,
3278    item_key: &str,
3279    binary: &Path,
3280    model: Option<&str>,
3281    timeout: u64,
3282    mode: &EnrichMode,
3283) -> Result<EnrichItemResult, AppError> {
3284    let (mem_id, body, desc): (i64, String, String) = conn
3285        .query_row(
3286            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3287            rusqlite::params![item_key],
3288            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3289        )
3290        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3291    let snippet: String = body.chars().take(500).collect();
3292    let ent_count: i64 = conn
3293        .query_row(
3294            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3295            rusqlite::params![mem_id],
3296            |r| r.get(0),
3297        )
3298        .unwrap_or(0);
3299    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3300    let (value, cost, is_oauth) = match mode {
3301        EnrichMode::ClaudeCode => call_claude(
3302            binary,
3303            GRAPH_AUDIT_PROMPT,
3304            GRAPH_AUDIT_SCHEMA,
3305            &input_text,
3306            model,
3307            timeout,
3308        )?,
3309        EnrichMode::Codex => call_codex(
3310            binary,
3311            GRAPH_AUDIT_PROMPT,
3312            GRAPH_AUDIT_SCHEMA,
3313            &input_text,
3314            model,
3315            timeout,
3316        )?,
3317    };
3318    let issues = value
3319        .get("issues")
3320        .and_then(|v| v.as_array())
3321        .map(|a| a.len())
3322        .unwrap_or(0);
3323    Ok(EnrichItemResult::Done {
3324        memory_id: Some(mem_id),
3325        entity_id: None,
3326        entities: 0,
3327        rels: issues,
3328        chars_before: None,
3329        chars_after: None,
3330        cost,
3331        is_oauth,
3332    })
3333}
3334
3335/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3336fn call_deep_research_synth(
3337    conn: &Connection,
3338    namespace: &str,
3339    item_key: &str,
3340    binary: &Path,
3341    model: Option<&str>,
3342    timeout: u64,
3343    mode: &EnrichMode,
3344) -> Result<EnrichItemResult, AppError> {
3345    let (mem_id, body): (i64, String) = conn
3346        .query_row(
3347            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3348            rusqlite::params![item_key],
3349            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3350        )
3351        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3352    let snippet: String = body.chars().take(2000).collect();
3353    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3354    let (value, cost, is_oauth) = match mode {
3355        EnrichMode::ClaudeCode => call_claude(
3356            binary,
3357            DEEP_RESEARCH_SYNTH_PROMPT,
3358            DEEP_RESEARCH_SYNTH_SCHEMA,
3359            &input_text,
3360            model,
3361            timeout,
3362        )?,
3363        EnrichMode::Codex => call_codex(
3364            binary,
3365            DEEP_RESEARCH_SYNTH_PROMPT,
3366            DEEP_RESEARCH_SYNTH_SCHEMA,
3367            &input_text,
3368            model,
3369            timeout,
3370        )?,
3371    };
3372    let mut ent_count = 0usize;
3373    let mut rel_count = 0usize;
3374    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3375        for e in ents {
3376            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3377            let etype_str = e
3378                .get("entity_type")
3379                .and_then(|v| v.as_str())
3380                .unwrap_or("concept");
3381            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3382            if name.len() >= 2 {
3383                let ne = NewEntity {
3384                    name: name.to_string(),
3385                    entity_type: etype,
3386                    description: None,
3387                };
3388                let _ = entities::upsert_entity(conn, namespace, &ne);
3389                ent_count += 1;
3390            }
3391        }
3392    }
3393    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3394        for r in rels {
3395            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3396            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3397            if src.is_empty() || tgt.is_empty() {
3398                continue;
3399            }
3400            let rel = r
3401                .get("relation")
3402                .and_then(|v| v.as_str())
3403                .unwrap_or("related");
3404            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3405            if let (Some(sid), Some(tid)) = (
3406                entities::find_entity_id(conn, namespace, src)?,
3407                entities::find_entity_id(conn, namespace, tgt)?,
3408            ) {
3409                let _ = entities::create_or_fetch_relationship(
3410                    conn, namespace, sid, tid, rel, str_, None,
3411                );
3412                rel_count += 1;
3413            }
3414        }
3415    }
3416    Ok(EnrichItemResult::Done {
3417        memory_id: Some(mem_id),
3418        entity_id: None,
3419        entities: ent_count,
3420        rels: rel_count,
3421        chars_before: None,
3422        chars_after: None,
3423        cost,
3424        is_oauth,
3425    })
3426}
3427
3428/// G27 P2: Extract structured body from unstructured text via LLM.
3429fn call_body_extract(
3430    conn: &Connection,
3431    _namespace: &str,
3432    item_key: &str,
3433    binary: &Path,
3434    model: Option<&str>,
3435    timeout: u64,
3436    mode: &EnrichMode,
3437) -> Result<EnrichItemResult, AppError> {
3438    let (mem_id, body): (i64, String) = conn
3439        .query_row(
3440            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3441            rusqlite::params![item_key],
3442            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3443        )
3444        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3445    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3446    let (value, cost, is_oauth) = match mode {
3447        EnrichMode::ClaudeCode => call_claude(
3448            binary,
3449            BODY_EXTRACT_PROMPT,
3450            BODY_EXTRACT_SCHEMA,
3451            &input_text,
3452            model,
3453            timeout,
3454        )?,
3455        EnrichMode::Codex => call_codex(
3456            binary,
3457            BODY_EXTRACT_PROMPT,
3458            BODY_EXTRACT_SCHEMA,
3459            &input_text,
3460            model,
3461            timeout,
3462        )?,
3463    };
3464    let restructured = value
3465        .get("restructured_body")
3466        .and_then(|v| v.as_str())
3467        .unwrap_or(&body);
3468    let chars_before = body.len();
3469    let chars_after = restructured.len();
3470    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3471    conn.execute(
3472        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3473        rusqlite::params![restructured, new_hash, mem_id],
3474    )?;
3475    Ok(EnrichItemResult::Done {
3476        memory_id: Some(mem_id),
3477        entity_id: None,
3478        entities: 0,
3479        rels: 0,
3480        chars_before: Some(chars_before),
3481        chars_after: Some(chars_after),
3482        cost,
3483        is_oauth,
3484    })
3485}
3486
3487/// Scan for pairs of entities that share no direct relationship.
3488#[allow(clippy::type_complexity)]
3489fn scan_isolated_entity_pairs(
3490    conn: &Connection,
3491    namespace: &str,
3492    limit: Option<usize>,
3493) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3494    let limit_val = limit.unwrap_or(50) as i64;
3495    let mut stmt = conn.prepare_cached(
3496        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3497         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3498         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3499           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3500           (r.source_id = e2.id AND r.target_id = e1.id)) \
3501         LIMIT ?2",
3502    )?;
3503    let rows = stmt
3504        .query_map(rusqlite::params![namespace, limit_val], |r| {
3505            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3506        })?
3507        .collect::<Result<Vec<_>, _>>()?;
3508    Ok(rows)
3509}
3510
3511/// Scan for entities with non-validated types (all entities for type audit).
3512fn scan_entities_for_type_validation(
3513    conn: &Connection,
3514    namespace: &str,
3515    limit: Option<usize>,
3516) -> Result<Vec<(i64, String, String)>, AppError> {
3517    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3518    let sql = format!(
3519        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3520    );
3521    let mut stmt = conn.prepare(&sql)?;
3522    let rows = stmt
3523        .query_map(rusqlite::params![namespace], |r| {
3524            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3525        })?
3526        .collect::<Result<Vec<_>, _>>()?;
3527    Ok(rows)
3528}
3529
3530/// Scan for memories with generic descriptions (ingested, imported, etc).
3531fn scan_generic_descriptions(
3532    conn: &Connection,
3533    namespace: &str,
3534    limit: Option<usize>,
3535) -> Result<Vec<(i64, String, String)>, AppError> {
3536    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3537    let sql = format!(
3538        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3539         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3540         ORDER BY id {limit_clause}"
3541    );
3542    let mut stmt = conn.prepare(&sql)?;
3543    let rows = stmt
3544        .query_map(rusqlite::params![namespace], |r| {
3545            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3546        })?
3547        .collect::<Result<Vec<_>, _>>()?;
3548    Ok(rows)
3549}
3550
3551/// Calls the Codex CLI for a single enrichment item.
3552///
3553/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3554fn call_codex(
3555    binary: &Path,
3556    prompt: &str,
3557    json_schema: &str,
3558    input_text: &str,
3559    model: Option<&str>,
3560    timeout_secs: u64,
3561) -> Result<(serde_json::Value, f64, bool), AppError> {
3562    use wait_timeout::ChildExt;
3563
3564    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3565    // schema to a trusted cache path (not /tmp), and reuse the
3566    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3567    // hardening rationale.
3568    super::codex_spawn::validate_codex_model(model)?;
3569    let schema_file = super::codex_spawn::trusted_schema_path()?;
3570
3571    let args = super::codex_spawn::CodexSpawnArgs {
3572        binary,
3573        prompt,
3574        json_schema,
3575        input_text,
3576        model,
3577        timeout_secs,
3578        schema_path: schema_file.clone(),
3579    };
3580    let mut cmd = super::codex_spawn::build_codex_command(&args);
3581
3582    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3583        AppError::Io(std::io::Error::new(
3584            e.kind(),
3585            format!("failed to spawn codex: {e}"),
3586        ))
3587    })?;
3588
3589    let full_prompt = format!("{prompt}\n\n{input_text}");
3590    let stdin_bytes = full_prompt.into_bytes();
3591    let mut child_stdin = child
3592        .stdin
3593        .take()
3594        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3595    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3596        child_stdin.write_all(&stdin_bytes)?;
3597        drop(child_stdin);
3598        Ok(())
3599    });
3600
3601    let start = std::time::Instant::now();
3602    let timeout = std::time::Duration::from_secs(timeout_secs);
3603    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3604    let _ = std::fs::remove_file(&schema_file);
3605
3606    match status {
3607        Some(exit_status) => {
3608            stdin_thread
3609                .join()
3610                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3611                .map_err(AppError::Io)?;
3612
3613            tracing::debug!(
3614                target: "process",
3615                exit_code = ?exit_status.code(),
3616                elapsed_ms = start.elapsed().as_millis() as u64,
3617                "external process completed"
3618            );
3619
3620            let mut stdout_buf = Vec::new();
3621            if let Some(mut out) = child.stdout.take() {
3622                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3623            }
3624            if !exit_status.success() {
3625                let mut stderr_buf = Vec::new();
3626                if let Some(mut err) = child.stderr.take() {
3627                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3628                }
3629                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3630                tracing::warn!(
3631                    target: "enrich",
3632                    exit_code = ?exit_status.code(),
3633                    stderr = %stderr_str.trim(),
3634                    "codex process failed"
3635                );
3636                return Err(AppError::Validation(format!(
3637                    "codex exited with code {:?}: {}",
3638                    exit_status.code(),
3639                    stderr_str.trim()
3640                )));
3641            }
3642            let stdout_str = String::from_utf8(stdout_buf)
3643                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3644            // G32: use the JSONL parser, NOT serde_json::from_str on the
3645            // entire stdout (codex emits one event per line).
3646            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3647            // Return the raw agent_message text parsed as JSON. Different
3648            // operations (memory-bindings, body-enrich) use different
3649            // output schemas, so we let the caller pick which fields to
3650            // extract. The previous implementation hardcoded
3651            // `{entities, urls}` which broke body-enrich.
3652            let value: serde_json::Value =
3653                serde_json::from_str(&result.last_agent_text).map_err(|e| {
3654                    AppError::Validation(format!(
3655                        "codex agent_message is not valid JSON: {e}; raw={}",
3656                        result.last_agent_text
3657                    ))
3658                })?;
3659            Ok((value, 0.0, false))
3660        }
3661        None => {
3662            let _ = child.kill();
3663            let _ = child.wait();
3664            let _ = stdin_thread.join();
3665            Err(AppError::Validation(format!(
3666                "codex timed out after {timeout_secs} seconds"
3667            )))
3668        }
3669    }
3670}
3671
3672// ---------------------------------------------------------------------------
3673// Tests
3674// ---------------------------------------------------------------------------
3675
3676#[cfg(test)]
3677mod tests {
3678    use super::*;
3679    use rusqlite::Connection;
3680    #[cfg(unix)]
3681    use std::os::unix::fs::PermissionsExt;
3682
3683    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
3684    fn open_test_db() -> Connection {
3685        let conn = Connection::open_in_memory().expect("in-memory db");
3686        conn.execute_batch(
3687            "CREATE TABLE memories (
3688                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3689                namespace   TEXT NOT NULL DEFAULT 'global',
3690                name        TEXT NOT NULL,
3691                type        TEXT NOT NULL DEFAULT 'note',
3692                description TEXT NOT NULL DEFAULT '',
3693                body        TEXT NOT NULL DEFAULT '',
3694                body_hash   TEXT NOT NULL DEFAULT '',
3695                session_id  TEXT,
3696                source      TEXT NOT NULL DEFAULT 'agent',
3697                metadata    TEXT NOT NULL DEFAULT '{}',
3698                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3699                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3700                deleted_at  INTEGER,
3701                UNIQUE(namespace, name)
3702            );
3703            CREATE TABLE entities (
3704                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3705                namespace   TEXT NOT NULL DEFAULT 'global',
3706                name        TEXT NOT NULL,
3707                type        TEXT NOT NULL DEFAULT 'concept',
3708                description TEXT,
3709                degree      INTEGER NOT NULL DEFAULT 0,
3710                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3711                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3712                UNIQUE(namespace, name)
3713            );
3714            CREATE TABLE memory_entities (
3715                memory_id  INTEGER NOT NULL,
3716                entity_id  INTEGER NOT NULL,
3717                PRIMARY KEY (memory_id, entity_id)
3718            );
3719            CREATE TABLE relationships (
3720                id         INTEGER PRIMARY KEY AUTOINCREMENT,
3721                namespace  TEXT NOT NULL DEFAULT 'global',
3722                source_id  INTEGER NOT NULL,
3723                target_id  INTEGER NOT NULL,
3724                relation   TEXT NOT NULL,
3725                weight     REAL NOT NULL DEFAULT 0.5,
3726                description TEXT,
3727                UNIQUE(source_id, target_id, relation)
3728            );
3729            CREATE TABLE memory_embeddings (
3730                memory_id   INTEGER PRIMARY KEY,
3731                namespace   TEXT NOT NULL,
3732                embedding   BLOB NOT NULL,
3733                source      TEXT NOT NULL,
3734                model       TEXT NOT NULL DEFAULT '',
3735                dim         INTEGER NOT NULL DEFAULT 384,
3736                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
3737            );",
3738        )
3739        .expect("schema creation must succeed");
3740        conn
3741    }
3742
3743    #[test]
3744    fn scan_unbound_memories_finds_memories_without_bindings() {
3745        let conn = open_test_db();
3746        conn.execute(
3747            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3748            [],
3749        )
3750        .unwrap();
3751
3752        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3753        assert_eq!(results.len(), 1);
3754        assert_eq!(results[0].1, "test-mem");
3755    }
3756
3757    #[test]
3758    fn scan_unbound_memories_excludes_bound_memories() {
3759        let conn = open_test_db();
3760        conn.execute(
3761            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3762            [],
3763        )
3764        .unwrap();
3765        let mem_id: i64 = conn
3766            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3767                r.get(0)
3768            })
3769            .unwrap();
3770        conn.execute(
3771            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3772            [],
3773        )
3774        .unwrap();
3775        let ent_id: i64 = conn
3776            .query_row(
3777                "SELECT id FROM entities WHERE name='some-entity'",
3778                [],
3779                |r| r.get(0),
3780            )
3781            .unwrap();
3782        conn.execute(
3783            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3784            rusqlite::params![mem_id, ent_id],
3785        )
3786        .unwrap();
3787
3788        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3789        assert!(results.is_empty(), "bound memory must not appear in scan");
3790    }
3791
3792    #[test]
3793    fn scan_entities_without_description_finds_null_description() {
3794        let conn = open_test_db();
3795        conn.execute(
3796            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3797            [],
3798        )
3799        .unwrap();
3800
3801        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3802        assert_eq!(results.len(), 1);
3803        assert_eq!(results[0].1, "my-tool");
3804    }
3805
3806    #[test]
3807    fn scan_entities_without_description_excludes_entities_with_description() {
3808        let conn = open_test_db();
3809        conn.execute(
3810            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3811            [],
3812        )
3813        .unwrap();
3814
3815        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3816        assert!(
3817            results.is_empty(),
3818            "entity with description must not appear"
3819        );
3820    }
3821
3822    #[test]
3823    fn scan_short_body_memories_finds_short_bodies() {
3824        let conn = open_test_db();
3825        conn.execute(
3826            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3827            [],
3828        )
3829        .unwrap();
3830
3831        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3832        assert_eq!(results.len(), 1);
3833        assert_eq!(results[0].1, "short-mem");
3834    }
3835
3836    #[test]
3837    fn scan_short_body_memories_excludes_long_bodies() {
3838        let conn = open_test_db();
3839        let long_body = "a".repeat(1000);
3840        conn.execute(
3841            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3842            rusqlite::params![long_body],
3843        )
3844        .unwrap();
3845
3846        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3847        assert!(results.is_empty(), "long memory must not appear in scan");
3848    }
3849
3850    #[test]
3851    fn scan_respects_limit() {
3852        let conn = open_test_db();
3853        for i in 0..5 {
3854            conn.execute(
3855                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3856                [],
3857            )
3858            .unwrap();
3859        }
3860
3861        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3862        assert_eq!(results.len(), 3, "limit must be respected");
3863    }
3864
3865    #[test]
3866    fn scan_memories_without_embeddings_finds_only_missing_rows() {
3867        let conn = open_test_db();
3868        conn.execute(
3869            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3870            [],
3871        )
3872        .unwrap();
3873        conn.execute(
3874            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3875            [],
3876        )
3877        .unwrap();
3878        let memory_id: i64 = conn
3879            .query_row(
3880                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3881                [],
3882                |r| r.get(0),
3883            )
3884            .unwrap();
3885        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3886        memories::upsert_vec(
3887            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3888        )
3889        .unwrap();
3890
3891        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3892        assert_eq!(results.len(), 1);
3893        assert_eq!(results[0].1, "missing-vec");
3894    }
3895
3896    #[test]
3897    fn scan_memories_without_embeddings_respects_name_filter() {
3898        let conn = open_test_db();
3899        conn.execute(
3900            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3901            [],
3902        )
3903        .unwrap();
3904        conn.execute(
3905            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3906            [],
3907        )
3908        .unwrap();
3909
3910        let results =
3911            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3912                .unwrap();
3913        assert_eq!(results.len(), 1);
3914        assert_eq!(results[0].1, "match-me");
3915    }
3916
3917    #[test]
3918    fn queue_db_schema_creates_correctly() {
3919        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3920        let conn = open_queue_db(&tmp_path).expect("queue db must open");
3921        let count: i64 = conn
3922            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3923            .unwrap();
3924        assert_eq!(count, 0);
3925        let _ = std::fs::remove_file(&tmp_path);
3926    }
3927
3928    #[test]
3929    fn parse_claude_output_valid_bindings() {
3930        let output = r#"[
3931            {"type":"system","subtype":"init"},
3932            {"type":"result","is_error":false,"total_cost_usd":0.01,
3933             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3934        ]"#;
3935        let result = crate::commands::claude_runner::parse_claude_output(output)
3936            .expect("must parse successfully");
3937        assert!(result.value.get("entities").is_some());
3938        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3939        assert!(!result.is_oauth);
3940    }
3941
3942    #[test]
3943    fn parse_claude_output_detects_oauth() {
3944        let output = r#"[
3945            {"type":"system","subtype":"init","apiKeySource":"none"},
3946            {"type":"result","is_error":false,"total_cost_usd":0.0,
3947             "structured_output":{"entities":[],"relationships":[]}}
3948        ]"#;
3949        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3950        assert!(result.is_oauth);
3951    }
3952
3953    #[test]
3954    fn parse_claude_output_rate_limit_returns_error() {
3955        let output = r#"[
3956            {"type":"system","subtype":"init"},
3957            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3958        ]"#;
3959        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3960        assert!(matches!(err, AppError::RateLimited { .. }));
3961    }
3962
3963    #[test]
3964    fn parse_claude_output_auth_error() {
3965        let output = r#"[
3966            {"type":"system","subtype":"init"},
3967            {"type":"result","is_error":true,"error":"authentication failed"}
3968        ]"#;
3969        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3970        assert!(format!("{err}").contains("authentication failed"));
3971    }
3972
3973    #[cfg(unix)]
3974    #[test]
3975    fn call_codex_returns_raw_json_for_body_enrich_schema() {
3976        let tmp = tempfile::tempdir().expect("tempdir");
3977        let binary = tmp.path().join("codex-mock");
3978        std::fs::write(
3979            &binary,
3980            r#"#!/usr/bin/env bash
3981set -euo pipefail
3982cat <<'JSONL'
3983{"type":"thread.started","thread_id":"mock-thread-0"}
3984{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
3985{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
3986JSONL
3987"#,
3988        )
3989        .expect("mock codex write");
3990        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
3991        perms.set_mode(0o755);
3992        std::fs::set_permissions(&binary, perms).expect("chmod");
3993
3994        let (value, cost, is_oauth) =
3995            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
3996                .expect("call_codex must accept body-enrich payload");
3997
3998        assert_eq!(value["enriched_body"], "expanded body");
3999        assert_eq!(cost, 0.0);
4000        assert!(!is_oauth);
4001    }
4002
4003    #[test]
4004    fn dry_run_emits_preview_without_calling_llm() {
4005        // This test validates the dry-run NDJSON contract without spawning any process.
4006        // The scan_operation function requires a DB; we build one in-memory but cannot
4007        // call run() directly because it needs AppPaths (disk). Instead we test the
4008        // lower-level helpers that the dry-run path relies on.
4009        let conn = open_test_db();
4010        conn.execute(
4011            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4012            [],
4013        )
4014        .unwrap();
4015
4016        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4017        assert_eq!(results.len(), 1);
4018        assert_eq!(results[0].1, "dry-mem");
4019        // If scan finds the item and dry_run is set, no LLM would be called.
4020        // The NDJSON emission is tested via integration tests with a fake binary.
4021    }
4022
4023    #[test]
4024    fn persist_entity_description_updates_db() {
4025        let conn = open_test_db();
4026        conn.execute(
4027            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4028            [],
4029        )
4030        .unwrap();
4031        let eid: i64 = conn
4032            .query_row(
4033                "SELECT id FROM entities WHERE name='tokio-runtime'",
4034                [],
4035                |r| r.get(0),
4036            )
4037            .unwrap();
4038
4039        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4040
4041        let desc: String = conn
4042            .query_row(
4043                "SELECT description FROM entities WHERE id=?1",
4044                rusqlite::params![eid],
4045                |r| r.get(0),
4046            )
4047            .unwrap();
4048        assert_eq!(desc, "Async runtime for Rust applications");
4049    }
4050
4051    #[test]
4052    fn bindings_schema_is_valid_json() {
4053        let _: serde_json::Value =
4054            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4055    }
4056
4057    #[test]
4058    fn entity_description_schema_is_valid_json() {
4059        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4060            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4061    }
4062
4063    #[test]
4064    fn body_enrich_schema_is_valid_json() {
4065        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4066            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4067    }
4068}