Skip to main content

sqlite_graphrag/commands/
enrich.rs

1//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
2//!
3//! Enriches the knowledge graph by running LLM-powered analysis over memories
4//! and entities that are missing key structural data. Operations are:
5//!
6//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
7//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
8//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
9//! - all others: scan + structured NDJSON output (not-yet-implemented dispatch)
10//!
11//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
12//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
13// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
14//!
15//! # DRY opportunity
16//!
17//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
18//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
19//! here verbatim. A future refactoring could extract them into a shared
20//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
21//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
22//! this stream's boundary — flagged here for the Integration stream to evaluate.
23
24use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39// ---------------------------------------------------------------------------
40// Constants
41// ---------------------------------------------------------------------------
42
43const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48// ---------------------------------------------------------------------------
49// JSON schema used for memory-bindings and body-enrich extraction
50// ---------------------------------------------------------------------------
51
52const BINDINGS_SCHEMA: &str = r#"{
53  "type": "object",
54  "properties": {
55    "entities": {
56      "type": "array",
57      "items": {
58        "type": "object",
59        "properties": {
60          "name": { "type": "string" },
61          "entity_type": {
62            "type": "string",
63            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64          }
65        },
66        "required": ["name", "entity_type"],
67        "additionalProperties": false
68      }
69    },
70    "relationships": {
71      "type": "array",
72      "items": {
73        "type": "object",
74        "properties": {
75          "source": { "type": "string" },
76          "target": { "type": "string" },
77          "relation": {
78            "type": "string",
79            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80          },
81          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82        },
83        "required": ["source","target","relation","strength"],
84        "additionalProperties": false
85      }
86    }
87  },
88  "required": ["entities","relationships"],
89  "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93  "type": "object",
94  "properties": {
95    "description": { "type": "string" }
96  },
97  "required": ["description"],
98  "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102  "type": "object",
103  "properties": {
104    "enriched_body": { "type": "string" }
105  },
106  "required": ["enriched_body"],
107  "additionalProperties": false
108}"#;
109
110// G27 P1: weight-calibrate
111const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120  "type": "object",
121  "properties": {
122    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123    "reasoning": { "type": "string" }
124  },
125  "required": ["calibrated_weight", "reasoning"],
126  "additionalProperties": false
127}"#;
128
129// G27 P1: relation-reclassify
130const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146  "type": "object",
147  "properties": {
148    "relation": { "type": "string" },
149    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150    "reasoning": { "type": "string" }
151  },
152  "required": ["relation", "strength", "reasoning"],
153  "additionalProperties": false
154}"#;
155
156// G27 P2: entity-connect — suggest relationships between isolated entities
157const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163  "type": "object",
164  "properties": {
165    "relation": { "type": "string" },
166    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167    "reasoning": { "type": "string" }
168  },
169  "required": ["relation", "strength", "reasoning"],
170  "additionalProperties": false
171}"#;
172
173// G27 P2: entity-type-validate — verify entity type assignments
174const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180  "type": "object",
181  "properties": {
182    "validated_type": { "type": "string" },
183    "was_correct": { "type": "boolean" },
184    "reasoning": { "type": "string" }
185  },
186  "required": ["validated_type", "was_correct", "reasoning"],
187  "additionalProperties": false
188}"#;
189
190// G27 P2: description-enrich — improve generic memory descriptions
191const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197  "type": "object",
198  "properties": {
199    "description": { "type": "string" },
200    "reasoning": { "type": "string" }
201  },
202  "required": ["description", "reasoning"],
203  "additionalProperties": false
204}"#;
205
206// G27 P2: domain-classify — classify memory into domain category
207const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211  "type": "object",
212  "properties": {
213    "domain": { "type": "string" },
214    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215    "reasoning": { "type": "string" }
216  },
217  "required": ["domain", "confidence", "reasoning"],
218  "additionalProperties": false
219}"#;
220
221// G27 P2: graph-audit — audit graph for quality issues
222const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227  "type": "object",
228  "properties": {
229    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231    "reasoning": { "type": "string" }
232  },
233  "required": ["quality_score", "issues", "reasoning"],
234  "additionalProperties": false
235}"#;
236
237// G27 P2: deep-research-synth — synthesize research findings into graph
238const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244  "type": "object",
245  "properties": {
246    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248    "summary": { "type": "string" }
249  },
250  "required": ["entities", "relationships", "summary"],
251  "additionalProperties": false
252}"#;
253
254// G27 P2: body-extract — extract structured content from unstructured text
255const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260  "type": "object",
261  "properties": {
262    "restructured_body": { "type": "string" },
263    "changes_summary": { "type": "string" }
264  },
265  "required": ["restructured_body", "changes_summary"],
266  "additionalProperties": false
267}"#;
268
269// ---------------------------------------------------------------------------
270// Prompts
271// ---------------------------------------------------------------------------
272
273const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288// ---------------------------------------------------------------------------
289// CLI args
290// ---------------------------------------------------------------------------
291
292/// Operation to perform in the `enrich` command.
293#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296    /// Add missing entity/relationship bindings to memories (fully implemented).
297    MemoryBindings,
298    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
299    EntityDescriptions,
300    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
301    BodyEnrich,
302    /// Calibrate relationship weights using LLM analysis (scan only).
303    WeightCalibrate,
304    /// Reclassify relationship types using LLM judgment (scan only).
305    RelationReclassify,
306    /// Connect isolated entities by suggesting new relationships (scan only).
307    EntityConnect,
308    /// Validate entity type assignments using LLM judgment (scan only).
309    EntityTypeValidate,
310    /// Enrich memory descriptions that are generic/auto-generated (scan only).
311    DescriptionEnrich,
312    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
313    CrossDomainBridges,
314    /// Classify memories into domain categories (scan only).
315    DomainClassify,
316    /// Audit the graph for quality issues (scan only).
317    GraphAudit,
318    /// Synthesize deep-research findings into graph memories (scan only).
319    DeepResearchSynth,
320    /// Extract structured body from unstructured text (scan only).
321    BodyExtract,
322}
323
324/// LLM provider for enrichment.
325#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
326pub enum EnrichMode {
327    /// Use locally installed Claude Code CLI (OAuth-first).
328    ClaudeCode,
329    /// Use locally installed OpenAI Codex CLI.
330    Codex,
331}
332
333impl std::fmt::Display for EnrichMode {
334    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335        match self {
336            EnrichMode::ClaudeCode => write!(f, "claude-code"),
337            EnrichMode::Codex => write!(f, "codex"),
338        }
339    }
340}
341
342/// Arguments for the `enrich` subcommand.
343#[derive(clap::Args)]
344#[command(
345    about = "Enrich graph memories and entities using an LLM provider",
346    after_long_help = "EXAMPLES:\n  \
347    # Add missing entity bindings to all unbound memories\n  \
348    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
349    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
350    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
351    # Expand short memory bodies (GAP-18)\n  \
352    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
353    # Resume an interrupted body-enrich run\n  \
354    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
355    # Retry only failed items from a previous run\n  \
356    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
357    EXIT CODES:\n  \
358    0  success\n  \
359    1  validation error (bad args, binary not found)\n  \
360    14 I/O error"
361)]
362pub struct EnrichArgs {
363    /// Enrichment operation to run.
364    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
365    pub operation: EnrichOperation,
366
367    /// LLM provider to use. Default: claude-code (OAuth-first).
368    #[arg(long, value_enum, default_value = "claude-code")]
369    pub mode: EnrichMode,
370
371    /// Maximum number of items to process in this run. Omit for all.
372    #[arg(long, value_name = "N")]
373    pub limit: Option<usize>,
374
375    /// Preview items without calling the LLM (zero tokens consumed).
376    #[arg(long)]
377    pub dry_run: bool,
378
379    /// Namespace to operate on. Default: global.
380    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
381    pub namespace: Option<String>,
382
383    // -- Provider flags (Claude) --
384    /// Path to the Claude Code binary. Default: auto-detect from PATH.
385    #[arg(long, value_name = "PATH")]
386    pub claude_binary: Option<PathBuf>,
387
388    /// Claude model to use (e.g. claude-sonnet-4-6).
389    #[arg(long, value_name = "MODEL")]
390    pub claude_model: Option<String>,
391
392    /// Timeout per item in seconds when using Claude Code. Default: 300.
393    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
394    pub claude_timeout: u64,
395
396    // -- Provider flags (Codex) --
397    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
398    #[arg(long, value_name = "PATH")]
399    pub codex_binary: Option<PathBuf>,
400
401    /// Codex model to use (e.g. o4-mini).
402    #[arg(long, value_name = "MODEL")]
403    pub codex_model: Option<String>,
404
405    /// Timeout per item in seconds when using Codex. Default: 300.
406    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
407    pub codex_timeout: u64,
408
409    // -- Cost controls --
410    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
411    #[arg(long, value_name = "USD")]
412    pub max_cost_usd: Option<f64>,
413
414    // -- Queue controls --
415    /// Resume a previously interrupted run (skip already-done items).
416    #[arg(long)]
417    pub resume: bool,
418
419    /// Retry only items that failed in a previous run.
420    #[arg(long)]
421    pub retry_failed: bool,
422
423    // -- body-enrich specific flags (GAP-18) --
424    /// Minimum output character count for body-enrich. Default: 500.
425    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
426    pub min_output_chars: usize,
427
428    /// Maximum output character count for body-enrich. Default: 2000.
429    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
430    pub max_output_chars: usize,
431
432    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
433    #[arg(long, default_value_t = true)]
434    pub preserve_check: bool,
435
436    /// Path to a custom prompt template file for body-enrich.
437    #[arg(long, value_name = "PATH")]
438    pub prompt_template: Option<PathBuf>,
439
440    /// Number of parallel LLM workers (default 1 = serial).
441    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
442    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
443    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
444    pub llm_parallelism: u32,
445
446    // -- Output / infra --
447    /// Emit NDJSON output. Always true; flag accepted for compatibility.
448    #[arg(long)]
449    pub json: bool,
450
451    /// Database path override.
452    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
453    pub db: Option<String>,
454
455    /// G30: poll for the job singleton every second for up to N seconds
456    /// when another invocation holds the lock. Default: 0 (fail fast).
457    #[arg(long, value_name = "SECONDS")]
458    pub wait_job_singleton: Option<u64>,
459
460    /// G30: force acquisition of the singleton lock by removing a stale
461    /// lock file from a previously crashed invocation. Use only when you
462    /// are certain no other `enrich`/`ingest` is running.
463    #[arg(long, default_value_t = false)]
464    pub force_job_singleton: bool,
465
466    /// G37: select a specific subset of memory names to enrich instead of
467    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
468    /// Empty when omitted (processes all candidates).
469    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
470    pub names: Vec<String>,
471
472    /// G37: read the subset of memory names from a file (one per line).
473    /// Lines starting with `#` and empty lines are ignored. Combined with
474    /// `--names` (union) when both are set.
475    #[arg(long, value_name = "PATH")]
476    pub names_file: Option<PathBuf>,
477
478    /// G35: probe the LLM provider with a 1-turn ping before processing
479    /// the batch. Aborts with a clear error if the rate-limit window is
480    /// closed (avoids burning N turns only to fail on item 1).
481    #[arg(long, default_value_t = false)]
482    pub preflight_check: bool,
483
484    /// G35: if a preflight probe or in-flight call hits the Claude rate
485    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
486    /// of failing the batch. Ignored when `--mode` is already `codex`.
487    #[arg(long, value_enum)]
488    pub fallback_mode: Option<EnrichMode>,
489
490    /// G35: number of seconds before the OAuth rate-limit reset at which
491    /// the preflight probe should refuse to start. Default 300 (5 min).
492    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
493    pub rate_limit_buffer: u64,
494
495    /// G28-D: refuse to start when the 1-minute load average exceeds
496    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
497    /// Set to false to skip the check on contended CI runners.
498    #[arg(long, default_value_t = true)]
499    pub max_load_check: bool,
500
501    /// G28-D: when the system is saturated, abort the job after this
502    /// many consecutive HardFailure outcomes. Default 5.
503    #[arg(long, value_name = "N", default_value_t = 5)]
504    pub circuit_breaker_threshold: u32,
505
506    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
507    /// original body and the LLM-rewritten body for the rewrite to be
508    /// accepted. Scores below the threshold are rejected and emitted as
509    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
510    /// gap specification). Ignored when `--operation` is not
511    /// `body-enrich`.
512    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
513    pub preserve_threshold: f64,
514
515    /// G33 Passo 3: when set, validate `--codex-model` against the
516    /// ChatGPT Pro OAuth accepted-model list and abort with a
517    /// suggestion when the value is unknown. Default true (fail fast
518    /// to avoid burning OAuth turns). Set to false to opt out.
519    #[arg(long, default_value_t = true)]
520    pub codex_model_validate: bool,
521
522    /// G33 Passo 3: when set together with an invalid `--codex-model`,
523    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
524    /// instead of aborting. The substitution is recorded in the NDJSON
525    /// stream as `provider_substituted: true` for traceability.
526    #[arg(long, value_name = "MODEL")]
527    pub codex_model_fallback: Option<String>,
528}
529
530// ---------------------------------------------------------------------------
531// Internal types — raw LLM output structs
532// ---------------------------------------------------------------------------
533
534// ---------------------------------------------------------------------------
535// NDJSON event types emitted to stdout
536// ---------------------------------------------------------------------------
537
538#[derive(Debug, Serialize)]
539struct PhaseEvent<'a> {
540    phase: &'a str,
541    #[serde(skip_serializing_if = "Option::is_none")]
542    binary_path: Option<&'a str>,
543    #[serde(skip_serializing_if = "Option::is_none")]
544    version: Option<&'a str>,
545    #[serde(skip_serializing_if = "Option::is_none")]
546    items_total: Option<usize>,
547    #[serde(skip_serializing_if = "Option::is_none")]
548    items_pending: Option<usize>,
549    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
550    #[serde(skip_serializing_if = "Option::is_none")]
551    llm_parallelism: Option<u32>,
552}
553
554#[derive(Debug, Serialize)]
555struct ItemEvent<'a> {
556    /// Item identifier (memory name or entity name).
557    item: &'a str,
558    status: &'a str,
559    #[serde(skip_serializing_if = "Option::is_none")]
560    memory_id: Option<i64>,
561    #[serde(skip_serializing_if = "Option::is_none")]
562    entity_id: Option<i64>,
563    #[serde(skip_serializing_if = "Option::is_none")]
564    entities: Option<usize>,
565    #[serde(skip_serializing_if = "Option::is_none")]
566    rels: Option<usize>,
567    #[serde(skip_serializing_if = "Option::is_none")]
568    chars_before: Option<usize>,
569    #[serde(skip_serializing_if = "Option::is_none")]
570    chars_after: Option<usize>,
571    #[serde(skip_serializing_if = "Option::is_none")]
572    cost_usd: Option<f64>,
573    #[serde(skip_serializing_if = "Option::is_none")]
574    elapsed_ms: Option<u64>,
575    #[serde(skip_serializing_if = "Option::is_none")]
576    error: Option<String>,
577    index: usize,
578    total: usize,
579}
580
581#[derive(Debug, Serialize)]
582struct EnrichSummary {
583    summary: bool,
584    operation: String,
585    items_total: usize,
586    completed: usize,
587    failed: usize,
588    skipped: usize,
589    cost_usd: f64,
590    elapsed_ms: u64,
591}
592
593use crate::output::emit_json_line as emit_json;
594
595// ---------------------------------------------------------------------------
596// Queue DB
597// ---------------------------------------------------------------------------
598
599/// Opens or creates the enrichment queue database.
600///
601/// The queue schema mirrors `ingest_claude` for resume/retry parity.
602/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
603///
604/// # DRY note
605///
606/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
607/// Both should be unified in a shared `llm_runner.rs` module by the
608/// Integration stream.
609fn open_queue_db(path: &str) -> Result<Connection, AppError> {
610    let conn = Connection::open(path)?;
611    conn.pragma_update(None, "journal_mode", "wal")?;
612    conn.execute_batch(
613        "CREATE TABLE IF NOT EXISTS queue (
614            id          INTEGER PRIMARY KEY AUTOINCREMENT,
615            item_key    TEXT NOT NULL UNIQUE,
616            item_type   TEXT NOT NULL DEFAULT 'memory',
617            status      TEXT NOT NULL DEFAULT 'pending',
618            memory_id   INTEGER,
619            entity_id   INTEGER,
620            entities    INTEGER DEFAULT 0,
621            rels        INTEGER DEFAULT 0,
622            error       TEXT,
623            cost_usd    REAL DEFAULT 0.0,
624            attempt     INTEGER DEFAULT 0,
625            elapsed_ms  INTEGER,
626            created_at  TEXT DEFAULT (datetime('now')),
627            done_at     TEXT
628        );
629        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
630    )?;
631    Ok(conn)
632}
633
634// ---------------------------------------------------------------------------
635// LLM invocation — Claude Code
636// ---------------------------------------------------------------------------
637
638/// Calls `claude -p` via the shared `claude_runner` module (G02).
639///
640/// Returns `(output_value, cost_usd, is_oauth)`.
641fn call_claude(
642    binary: &Path,
643    prompt: &str,
644    json_schema: &str,
645    input_text: &str,
646    model: Option<&str>,
647    timeout_secs: u64,
648) -> Result<(serde_json::Value, f64, bool), AppError> {
649    let result = crate::commands::claude_runner::run_claude(
650        binary,
651        prompt,
652        json_schema,
653        input_text,
654        model,
655        timeout_secs,
656        7,
657    )?;
658    Ok((result.value, result.cost_usd, result.is_oauth))
659}
660
661// ---------------------------------------------------------------------------
662// Preflight probe (G35) — single-turn ping to verify the LLM provider
663// ---------------------------------------------------------------------------
664
665/// Result of a single preflight ping (G35).
666enum PreflightOutcome {
667    /// The provider accepted the ping without rate-limit or other errors.
668    Healthy,
669    /// The provider rejected the ping due to OAuth rate limit. The
670    /// `suggestion` field is a human hint that callers can embed in the
671    /// user-facing error.
672    RateLimited {
673        reason: String,
674        suggestion: &'static str,
675    },
676    /// Any other provider error (binary missing, auth failure, etc.).
677    Error(AppError),
678}
679
680/// Probes the configured LLM provider with a 1-turn ping.
681///
682/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
683/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
684///
685/// The probe intentionally avoids spawning any MCP server children (G28-A)
686/// to keep its own process footprint at the minimum.
687fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
688    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
689
690    match args.mode {
691        EnrichMode::ClaudeCode => {
692            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
693                Ok(b) => b,
694                Err(e) => return PreflightOutcome::Error(e),
695            };
696            let mut cmd = std::process::Command::new(&bin);
697            cmd.env_clear();
698            for var in &["PATH", "HOME", "USER"] {
699                if let Ok(val) = std::env::var(var) {
700                    cmd.env(var, val);
701                }
702            }
703            cmd.arg("-p")
704                .arg("ping")
705                .arg("--max-turns")
706                .arg("1")
707                .arg("--strict-mcp-config")
708                .arg("--mcp-config")
709                .arg("{}")
710                .arg("--dangerously-skip-permissions")
711                .arg("--settings")
712                .arg("{\"hooks\":{}}")
713                .arg("--output-format")
714                .arg("json")
715                .stdin(std::process::Stdio::null())
716                .stdout(std::process::Stdio::piped())
717                .stderr(std::process::Stdio::piped());
718
719            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
720                Ok(c) => c,
721                Err(e) => {
722                    return PreflightOutcome::Error(AppError::Io(e));
723                }
724            };
725            let output = match wait_with_timeout(child, timeout) {
726                Ok(out) => out,
727                Err(e) => return PreflightOutcome::Error(e),
728            };
729            if !output.status.success() {
730                let stderr = String::from_utf8_lossy(&output.stderr);
731                if stderr.contains("hit your session limit")
732                    || stderr.contains("rate_limit")
733                    || stderr.contains("429")
734                {
735                    return PreflightOutcome::RateLimited {
736                        reason: stderr.trim().to_string(),
737                        suggestion:
738                            "wait for the OAuth window to reset or use --fallback-mode codex",
739                    };
740                }
741                return PreflightOutcome::Error(AppError::Validation(format!(
742                    "preflight probe failed: {stderr}",
743                    stderr = stderr.trim()
744                )));
745            }
746            PreflightOutcome::Healthy
747        }
748        EnrichMode::Codex => {
749            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
750                Ok(b) => b,
751                Err(e) => return PreflightOutcome::Error(e),
752            };
753            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
754                .map_err(PreflightOutcome::Error)
755                .ok();
756            let schema = "{}";
757            let schema_path = match super::codex_spawn::trusted_schema_path() {
758                Ok(p) => p,
759                Err(e) => return PreflightOutcome::Error(e),
760            };
761            let spawn_args = super::codex_spawn::CodexSpawnArgs {
762                binary: &bin,
763                prompt: "ping",
764                json_schema: schema,
765                input_text: "",
766                model: args.codex_model.as_deref(),
767                timeout_secs: args.rate_limit_buffer.max(60),
768                schema_path: schema_path.clone(),
769            };
770            let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
771            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
772                Ok(c) => c,
773                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
774            };
775            let output = match wait_with_timeout(child, timeout) {
776                Ok(out) => out,
777                Err(e) => return PreflightOutcome::Error(e),
778            };
779            let _ = std::fs::remove_file(&schema_path);
780            if !output.status.success() {
781                let stderr = String::from_utf8_lossy(&output.stderr);
782                if stderr.contains("rate_limit")
783                    || stderr.contains("429")
784                    || stderr.contains("Too Many Requests")
785                {
786                    return PreflightOutcome::RateLimited {
787                        reason: stderr.trim().to_string(),
788                        suggestion: "wait for the rate-limit window to reset",
789                    };
790                }
791                return PreflightOutcome::Error(AppError::Validation(format!(
792                    "preflight probe failed: {stderr}",
793                    stderr = stderr.trim()
794                )));
795            }
796            PreflightOutcome::Healthy
797        }
798    }
799}
800
801/// Cross-platform wait with timeout (no extra crate dependency).
802fn wait_with_timeout(
803    mut child: std::process::Child,
804    timeout: std::time::Duration,
805) -> Result<std::process::Output, AppError> {
806    use wait_timeout::ChildExt;
807    let start = std::time::Instant::now();
808    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
809    if status.is_none() {
810        let _ = child.kill();
811        let _ = child.wait();
812        return Err(AppError::Validation(format!(
813            "preflight probe timed out after {}s",
814            start.elapsed().as_secs()
815        )));
816    }
817    let mut stdout = Vec::new();
818    if let Some(mut out) = child.stdout.take() {
819        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
820    }
821    let mut stderr = Vec::new();
822    if let Some(mut err) = child.stderr.take() {
823        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
824    }
825    let exit = status.unwrap();
826    Ok(std::process::Output {
827        status: exit,
828        stdout,
829        stderr,
830    })
831}
832
833// ---------------------------------------------------------------------------
834// SCAN helpers — SQL queries that find items needing enrichment
835// ---------------------------------------------------------------------------
836
837/// Returns memories without any `memory_entities` binding.
838///
839/// These are the targets for `memory-bindings` enrichment. When `name_filter`
840/// is non-empty, restricts the scan to the given names (G37); unknown names
841/// are silently skipped (the caller can detect them by comparing
842/// requested vs. returned).
843fn scan_unbound_memories(
844    conn: &Connection,
845    namespace: &str,
846    limit: Option<usize>,
847    name_filter: &[String],
848) -> Result<Vec<(i64, String, String)>, AppError> {
849    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
850
851    if name_filter.is_empty() {
852        let sql = format!(
853            "SELECT m.id, m.name, m.body
854             FROM memories m
855             WHERE m.namespace = ?1
856               AND m.deleted_at IS NULL
857               AND NOT EXISTS (
858                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
859               )
860             ORDER BY m.id
861             {limit_clause}"
862        );
863        let mut stmt = conn.prepare(&sql)?;
864        let rows = stmt
865            .query_map(rusqlite::params![namespace], |r| {
866                Ok((
867                    r.get::<_, i64>(0)?,
868                    r.get::<_, String>(1)?,
869                    r.get::<_, String>(2)?,
870                ))
871            })?
872            .collect::<Result<Vec<_>, _>>()?;
873        Ok(rows)
874    } else {
875        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
876        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
877            .map(|i| format!("?{i}"))
878            .collect();
879        let in_clause = placeholders.join(", ");
880        let sql = format!(
881            "SELECT m.id, m.name, m.body
882             FROM memories m
883             WHERE m.namespace = ?1
884               AND m.deleted_at IS NULL
885               AND m.name IN ({in_clause})
886               AND NOT EXISTS (
887                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
888               )
889             ORDER BY m.id
890             {limit_clause}"
891        );
892        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
893        params_vec.push(&namespace);
894        for n in name_filter {
895            params_vec.push(n);
896        }
897        let mut stmt = conn.prepare(&sql)?;
898        let rows = stmt
899            .query_map(
900                rusqlite::params_from_iter(params_vec.iter().copied()),
901                |r| {
902                    Ok((
903                        r.get::<_, i64>(0)?,
904                        r.get::<_, String>(1)?,
905                        r.get::<_, String>(2)?,
906                    ))
907                },
908            )?
909            .collect::<Result<Vec<_>, _>>()?;
910        Ok(rows)
911    }
912}
913
914/// Reads a list of memory names from a UTF-8 text file (G37).
915///
916/// Empty lines and lines beginning with `#` are skipped. Returns a
917/// de-duplicated, order-preserving list of trimmed names.
918fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
919    let content = std::fs::read_to_string(path).map_err(|e| {
920        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
921    })?;
922    let mut seen = std::collections::HashSet::new();
923    let mut out = Vec::new();
924    for line in content.lines() {
925        let trimmed = line.trim();
926        if trimmed.is_empty() || trimmed.starts_with('#') {
927            continue;
928        }
929        if seen.insert(trimmed.to_string()) {
930            out.push(trimmed.to_string());
931        }
932    }
933    Ok(out)
934}
935
936/// Resolves the union of `--names` and `--names-file` (G37).
937fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
938    let mut combined: Vec<String> = args.names.clone();
939    if let Some(p) = &args.names_file {
940        let from_file = read_names_file(p)?;
941        for n in from_file {
942            if !combined.contains(&n) {
943                combined.push(n);
944            }
945        }
946    }
947    Ok(combined)
948}
949
950/// Returns entities with NULL or empty description.
951///
952/// These are the targets for `entity-descriptions` enrichment.
953fn scan_entities_without_description(
954    conn: &Connection,
955    namespace: &str,
956    limit: Option<usize>,
957) -> Result<Vec<(i64, String, String)>, AppError> {
958    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
959    let sql = format!(
960        "SELECT id, name, type
961         FROM entities
962         WHERE namespace = ?1
963           AND (description IS NULL OR description = '')
964         ORDER BY id
965         {limit_clause}"
966    );
967    let mut stmt = conn.prepare(&sql)?;
968    let rows = stmt
969        .query_map(rusqlite::params![namespace], |r| {
970            Ok((
971                r.get::<_, i64>(0)?,
972                r.get::<_, String>(1)?,
973                r.get::<_, String>(2)?,
974            ))
975        })?
976        .collect::<Result<Vec<_>, _>>()?;
977    Ok(rows)
978}
979
980/// Returns memories whose body length is below the configured minimum.
981///
982/// These are the targets for `body-enrich` (GAP-18).
983fn scan_short_body_memories(
984    conn: &Connection,
985    namespace: &str,
986    min_chars: usize,
987    limit: Option<usize>,
988) -> Result<Vec<(i64, String, String)>, AppError> {
989    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
990    let sql = format!(
991        "SELECT m.id, m.name, m.body
992         FROM memories m
993         WHERE m.namespace = ?1
994           AND m.deleted_at IS NULL
995           AND LENGTH(COALESCE(m.body,'')) < ?2
996         ORDER BY m.id
997         {limit_clause}"
998    );
999    let mut stmt = conn.prepare(&sql)?;
1000    let rows = stmt
1001        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1002            Ok((
1003                r.get::<_, i64>(0)?,
1004                r.get::<_, String>(1)?,
1005                r.get::<_, String>(2)?,
1006            ))
1007        })?
1008        .collect::<Result<Vec<_>, _>>()?;
1009    Ok(rows)
1010}
1011
1012/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1013#[allow(clippy::type_complexity)]
1014fn scan_weight_candidates(
1015    conn: &Connection,
1016    namespace: &str,
1017    limit: Option<usize>,
1018) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1019    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1020    let sql = format!(
1021        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1022         FROM relationships r \
1023         JOIN entities e1 ON e1.id = r.source_id \
1024         JOIN entities e2 ON e2.id = r.target_id \
1025         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1026         ORDER BY r.weight DESC {limit_clause}"
1027    );
1028    let mut stmt = conn.prepare(&sql)?;
1029    let rows = stmt
1030        .query_map(rusqlite::params![namespace], |r| {
1031            Ok((
1032                r.get::<_, i64>(0)?,
1033                r.get::<_, String>(1)?,
1034                r.get::<_, String>(2)?,
1035                r.get::<_, String>(3)?,
1036                r.get::<_, f64>(4)?,
1037            ))
1038        })?
1039        .collect::<Result<Vec<_>, _>>()?;
1040    Ok(rows)
1041}
1042
1043/// G27: Returns relationships with generic relation types (applies_to).
1044fn scan_generic_relations(
1045    conn: &Connection,
1046    namespace: &str,
1047    limit: Option<usize>,
1048) -> Result<Vec<(i64, String, String, String)>, AppError> {
1049    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1050    let sql = format!(
1051        "SELECT r.id, e1.name, e2.name, r.relation \
1052         FROM relationships r \
1053         JOIN entities e1 ON e1.id = r.source_id \
1054         JOIN entities e2 ON e2.id = r.target_id \
1055         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1056         ORDER BY r.id {limit_clause}"
1057    );
1058    let mut stmt = conn.prepare(&sql)?;
1059    let rows = stmt
1060        .query_map(rusqlite::params![namespace], |r| {
1061            Ok((
1062                r.get::<_, i64>(0)?,
1063                r.get::<_, String>(1)?,
1064                r.get::<_, String>(2)?,
1065                r.get::<_, String>(3)?,
1066            ))
1067        })?
1068        .collect::<Result<Vec<_>, _>>()?;
1069    Ok(rows)
1070}
1071
1072// ---------------------------------------------------------------------------
1073// PERSIST helpers for fully-implemented operations
1074// ---------------------------------------------------------------------------
1075
1076/// Persists entity bindings extracted by the LLM for a memory.
1077///
1078/// Creates entities via `upsert_entity`, links them to the memory via
1079/// `link_memory_entity`, and upserts relationships found between entities.
1080fn persist_memory_bindings(
1081    conn: &Connection,
1082    namespace: &str,
1083    memory_id: i64,
1084    entities_json: &serde_json::Value,
1085    rels_json: &serde_json::Value,
1086) -> Result<(usize, usize), AppError> {
1087    #[derive(Deserialize)]
1088    struct EntityItem {
1089        name: String,
1090        entity_type: String,
1091    }
1092    #[derive(Deserialize)]
1093    struct RelItem {
1094        source: String,
1095        target: String,
1096        relation: String,
1097        strength: f64,
1098    }
1099
1100    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1101        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1102
1103    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1104        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1105
1106    let mut ent_count = 0usize;
1107    let mut rel_count = 0usize;
1108
1109    for item in &extracted_entities {
1110        let entity_type = match item.entity_type.parse::<EntityType>() {
1111            Ok(et) => et,
1112            Err(_) => {
1113                tracing::warn!(
1114                    target: "enrich",
1115                    entity = %item.name,
1116                    entity_type = %item.entity_type,
1117                    "entity type not recognized, skipping"
1118                );
1119                continue;
1120            }
1121        };
1122        match entities::upsert_entity(
1123            conn,
1124            namespace,
1125            &NewEntity {
1126                name: item.name.clone(),
1127                entity_type,
1128                description: None,
1129            },
1130        ) {
1131            Ok(eid) => {
1132                let _ = entities::link_memory_entity(conn, memory_id, eid);
1133                ent_count += 1;
1134            }
1135            Err(e) => {
1136                tracing::warn!(
1137                    target: "enrich",
1138                    entity = %item.name,
1139                    error = %e,
1140                    "entity upsert skipped"
1141                );
1142            }
1143        }
1144    }
1145
1146    for rel in &extracted_rels {
1147        let normalized = crate::parsers::normalize_relation(&rel.relation);
1148        crate::parsers::warn_if_non_canonical(&normalized);
1149
1150        // Normalize entity names before lookup: upsert_entity normalizes on write,
1151        // so the lookup must use the same normalized form to find the row.
1152        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1153        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1154        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1155        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1156        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1157            let new_rel = NewRelationship {
1158                source: rel.source.clone(),
1159                target: rel.target.clone(),
1160                relation: normalized,
1161                strength: rel.strength,
1162                description: None,
1163            };
1164            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1165                rel_count += 1;
1166            }
1167        }
1168    }
1169
1170    Ok((ent_count, rel_count))
1171}
1172
1173/// Updates an entity's description directly in the `entities` table.
1174fn persist_entity_description(
1175    conn: &Connection,
1176    entity_id: i64,
1177    description: &str,
1178) -> Result<(), AppError> {
1179    conn.execute(
1180        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1181        rusqlite::params![description, entity_id],
1182    )?;
1183    Ok(())
1184}
1185
1186/// Persists an enriched memory body (body-enrich, GAP-18).
1187///
1188/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1189/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1190fn persist_enriched_body(
1191    conn: &Connection,
1192    namespace: &str,
1193    memory_id: i64,
1194    memory_name: &str,
1195    new_body: &str,
1196    paths: &crate::paths::AppPaths,
1197) -> Result<(), AppError> {
1198    // Read current values for FTS sync
1199    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1200        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1201        rusqlite::params![memory_id],
1202        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1203    )?;
1204
1205    let memory_type: String = conn.query_row(
1206        "SELECT type FROM memories WHERE id=?1",
1207        rusqlite::params![memory_id],
1208        |r| r.get(0),
1209    )?;
1210
1211    let description: String = conn.query_row(
1212        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1213        rusqlite::params![memory_id],
1214        |r| r.get(0),
1215    )?;
1216
1217    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1218
1219    let new_memory = memories::NewMemory {
1220        namespace: namespace.to_string(),
1221        name: memory_name.to_string(),
1222        memory_type: memory_type.clone(),
1223        description: description.clone(),
1224        body: new_body.to_string(),
1225        body_hash,
1226        session_id: None,
1227        source: "agent".to_string(),
1228        metadata: serde_json::json!({
1229            "operation": "body-enrich",
1230            "orig_chars": old_body.chars().count(),
1231            "new_chars": new_body.chars().count(),
1232        }),
1233    };
1234
1235    // G29 audit: insert a new immutable version BEFORE the update so the
1236    // enriched body is reachable through `history --name <X>` and
1237    // `restore --version N` can roll back to the pre-enrich state.
1238    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1239    let version_metadata = serde_json::json!({
1240        "operation": "body-enrich",
1241        "orig_chars": old_body.chars().count(),
1242        "new_chars": new_body.chars().count(),
1243    })
1244    .to_string();
1245    crate::storage::versions::insert_version(
1246        conn,
1247        memory_id,
1248        next_version,
1249        memory_name,
1250        &memory_type,
1251        &description,
1252        new_body,
1253        &version_metadata,
1254        Some("enrich"),
1255        "edit",
1256    )?;
1257
1258    memories::update(conn, memory_id, &new_memory, None)?;
1259    memories::sync_fts_after_update(
1260        conn,
1261        memory_id,
1262        &old_name,
1263        &old_desc,
1264        &old_body,
1265        &new_memory.name,
1266        &new_memory.description,
1267        &new_memory.body,
1268    )?;
1269
1270    // Re-embed for recall accuracy
1271    let snippet: String = new_body.chars().take(200).collect();
1272    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
1273    let chunks_info = crate::chunking::split_into_chunks_hierarchical(new_body, tokenizer);
1274    let embedding_result = if chunks_info.len() <= 1 {
1275        crate::daemon::embed_passage_or_local(&paths.models, new_body)
1276    } else {
1277        let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1278        let mut ok = true;
1279        for chunk in &chunks_info {
1280            let text = crate::chunking::chunk_text(new_body, chunk);
1281            match crate::daemon::embed_passage_or_local(&paths.models, text) {
1282                Ok(emb) => chunk_embeddings.push(emb),
1283                Err(e) => {
1284                    tracing::warn!(target: "enrich", error = %e, "chunk embedding failed");
1285                    ok = false;
1286                    break;
1287                }
1288            }
1289        }
1290        if ok {
1291            Ok(crate::chunking::aggregate_embeddings(&chunk_embeddings))
1292        } else {
1293            crate::daemon::embed_passage_or_local(&paths.models, new_body)
1294        }
1295    };
1296
1297    if let Ok(embedding) = embedding_result {
1298        if let Err(e) = memories::upsert_vec(
1299            conn,
1300            memory_id,
1301            namespace,
1302            &memory_type,
1303            &embedding,
1304            memory_name,
1305            &snippet,
1306        ) {
1307            tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1308        }
1309    }
1310
1311    Ok(())
1312}
1313
1314// ---------------------------------------------------------------------------
1315// Main entry point
1316// ---------------------------------------------------------------------------
1317
1318/// Main entry point for the `enrich` command.
1319pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
1320    // TODO(G20): add mode-conditional flag validation before DB access.
1321    // Flags that are silently discarded when the wrong mode is active:
1322    //   --mode claude-code: codex_binary, codex_model, codex_timeout
1323    //   --mode codex:       claude_binary, claude_model, claude_timeout,
1324    //                       max_cost_usd, rate_limit_wait
1325    // Approach: check each non-default flag value early and return
1326    // Err(AppError::Validation(...)) for incompatible mode+flag combinations.
1327    let started = Instant::now();
1328
1329    let paths = AppPaths::resolve(args.db.as_deref())?;
1330    ensure_db_ready(&paths)?;
1331    let conn = open_rw(&paths.db)?;
1332    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1333
1334    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1335    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1336    // on the same DB cannot co-exist, but concurrent enrich on different
1337    // databases works as expected. The force flag (--force) breaks a
1338    // stale lock from a previously crashed invocation.
1339    let wait_secs = args.wait_job_singleton;
1340    let force_flag = args.force_job_singleton;
1341    let _singleton = crate::lock::acquire_job_singleton(
1342        crate::lock::JobType::Enrich,
1343        &namespace,
1344        &paths.db,
1345        wait_secs,
1346        force_flag,
1347    )?;
1348
1349    // Validate provider binary upfront
1350    let _effective_mode: EnrichMode = args.mode.clone();
1351    let provider_binary = match args.mode {
1352        EnrichMode::ClaudeCode => {
1353            let bin = find_claude_binary(args.claude_binary.as_deref())?;
1354            let version = super::claude_runner::validate_claude_version(&bin)?;
1355            tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1356            emit_json(&PhaseEvent {
1357                phase: "validate",
1358                binary_path: bin.to_str(),
1359                version: Some(&version),
1360                items_total: None,
1361                items_pending: None,
1362                llm_parallelism: None,
1363            });
1364            bin
1365        }
1366        EnrichMode::Codex => {
1367            // Codex provider: locate binary using env or PATH
1368            let bin = find_codex_binary(args.codex_binary.as_deref())?;
1369            emit_json(&PhaseEvent {
1370                phase: "validate",
1371                binary_path: bin.to_str(),
1372                version: None,
1373                items_total: None,
1374                items_pending: None,
1375                llm_parallelism: None,
1376            });
1377            bin
1378        }
1379    };
1380
1381    // G28-D: refuse to start when the system is saturated. This check
1382    // is BEFORE preflight so we never spend an OAuth turn on a host
1383    // that is already at the limit.
1384    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1385        let load = crate::system_load::load_average_one();
1386        let n = crate::system_load::ncpus();
1387        return Err(AppError::Validation(format!(
1388            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1389             pass --no-max-load-check to override (not recommended)"
1390        )));
1391    }
1392
1393    // G35: preflight probe — issue a single ping turn to verify the
1394    // provider is healthy before scanning N candidates. If the probe
1395    // fails with a rate-limit error, optionally fall back to a
1396    // different mode (typically codex) instead of failing the entire
1397    // batch. The probe itself consumes 1 OAuth turn, so it stays
1398    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1399    if args.preflight_check && !args.dry_run {
1400        let preflight_result = run_preflight_probe(args);
1401        match preflight_result {
1402            PreflightOutcome::Healthy => {
1403                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1404            }
1405            PreflightOutcome::RateLimited { reason, suggestion } => {
1406                if let Some(fallback) = args.fallback_mode.clone() {
1407                    if fallback != args.mode {
1408                        // G35 (v1.0.69): the mid-batch mode switch is
1409                        // intentionally NOT applied because it would
1410                        // desynchronise the per-item rate-limit wait
1411                        // state (rate-limited items in the worker are
1412                        // timed against the original provider). Instead
1413                        // we abort cleanly so the operator can re-invoke
1414                        // with `--mode {fallback:?}`. This guarantees no
1415                        // OAuth window is wasted and no partial state
1416                        // is left in the queue.
1417                        return Err(AppError::Validation(format!(
1418                            "preflight detected rate limit on {mode:?}: {reason}; \
1419                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1420                            mode = args.mode
1421                        )));
1422                    }
1423                    return Err(AppError::Validation(format!(
1424                        "preflight detected rate limit on {mode:?}: {reason}; \
1425                         --fallback-mode matches --mode, no recovery possible",
1426                        mode = args.mode
1427                    )));
1428                }
1429                return Err(AppError::Validation(format!(
1430                    "preflight detected rate limit on {mode:?}: {reason}; \
1431                     {suggestion}; pass --fallback-mode codex to recover",
1432                    mode = args.mode
1433                )));
1434            }
1435            PreflightOutcome::Error(e) => {
1436                return Err(e);
1437            }
1438        }
1439    }
1440
1441    // SCAN phase
1442    let scan_result = scan_operation(&conn, &namespace, args)?;
1443    let total = scan_result.len();
1444
1445    emit_json(&PhaseEvent {
1446        phase: "scan",
1447        binary_path: None,
1448        version: None,
1449        items_total: Some(total),
1450        items_pending: Some(total),
1451        llm_parallelism: Some(args.llm_parallelism),
1452    });
1453
1454    // Dry-run: emit preview events and summary without calling LLM
1455    if args.dry_run {
1456        for (idx, key) in scan_result.iter().enumerate() {
1457            emit_json(&ItemEvent {
1458                item: key,
1459                status: "preview",
1460                memory_id: None,
1461                entity_id: None,
1462                entities: None,
1463                rels: None,
1464                chars_before: None,
1465                chars_after: None,
1466                cost_usd: None,
1467                elapsed_ms: None,
1468                error: None,
1469                index: idx,
1470                total,
1471            });
1472        }
1473        emit_json(&EnrichSummary {
1474            summary: true,
1475            operation: format!("{:?}", args.operation),
1476            items_total: total,
1477            completed: 0,
1478            failed: 0,
1479            skipped: 0,
1480            cost_usd: 0.0,
1481            elapsed_ms: started.elapsed().as_millis() as u64,
1482        });
1483        return Ok(());
1484    }
1485
1486    // All 13 operations are now implemented (G27 complete).
1487
1488    // Queue setup for resume/retry
1489    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1490
1491    if args.resume {
1492        let reset = queue_conn
1493            .execute(
1494                "UPDATE queue SET status='pending' WHERE status='processing'",
1495                [],
1496            )
1497            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1498        if reset > 0 {
1499            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1500        }
1501    }
1502
1503    if args.retry_failed {
1504        let count = queue_conn
1505            .execute(
1506                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1507                [],
1508            )
1509            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1510        tracing::info!(target: "enrich", count, "retrying failed items");
1511    }
1512
1513    if !args.resume && !args.retry_failed {
1514        queue_conn
1515            .execute("DELETE FROM queue", [])
1516            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1517    }
1518
1519    // Populate queue
1520    for (idx, key) in scan_result.iter().enumerate() {
1521        let item_type = match args.operation {
1522            EnrichOperation::EntityDescriptions => "entity",
1523            _ => "memory",
1524        };
1525        if let Err(e) = queue_conn.execute(
1526            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1527            rusqlite::params![key, item_type],
1528        ) {
1529            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1530        }
1531        let _ = idx; // suppress unused warning
1532    }
1533
1534    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1535    // Clamp enforces the range even if the caller bypasses clap validation.
1536    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1537    if parallelism > 1 {
1538        tracing::info!(
1539            target: "enrich",
1540            llm_parallelism = parallelism,
1541            "parallel LLM processing with bounded thread pool"
1542        );
1543    }
1544    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1545    // ceiling. The threshold and message depend on the LLM mode because
1546    // Claude Code spawns MCP children (G28-A) while Codex does not.
1547    if parallelism > 4 {
1548        match args.mode {
1549            EnrichMode::ClaudeCode => {
1550                tracing::warn!(
1551                    target: "enrich",
1552                    llm_parallelism = parallelism,
1553                    recommended_max = 4,
1554                    mode = "claude-code",
1555                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1556                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1557                     to cut MCP children (G28-A)"
1558                );
1559            }
1560            EnrichMode::Codex if parallelism > 16 => {
1561                tracing::warn!(
1562                    target: "enrich",
1563                    llm_parallelism = parallelism,
1564                    recommended_max = 16,
1565                    mode = "codex",
1566                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1567                     consider --llm-parallelism 8 for safer concurrency"
1568                );
1569            }
1570            EnrichMode::Codex => {
1571                // No warning: codex does not spawn MCP children and was
1572                // validated at parallelism 8 in production (1161 items,
1573                // 0 failures) per the 2026-06-04 session audit.
1574            }
1575        }
1576    }
1577
1578    let mut completed = 0usize;
1579    let mut failed = 0usize;
1580    let mut skipped = 0usize;
1581    let mut cost_total = 0.0f64;
1582    let mut oauth_detected = false;
1583    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1584    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1585    let enrich_started = std::time::Instant::now();
1586
1587    let provider_timeout = match args.mode {
1588        EnrichMode::ClaudeCode => args.claude_timeout,
1589        EnrichMode::Codex => args.codex_timeout,
1590    };
1591
1592    let provider_model: Option<&str> = match args.mode {
1593        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1594        EnrichMode::Codex => args.codex_model.as_deref(),
1595    };
1596
1597    // G19: when parallelism > 1, spawn bounded worker threads.
1598    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1599    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1600    if parallelism > 1 {
1601        let stdout_mu = parking_lot::Mutex::new(());
1602        let budget = args.max_cost_usd;
1603        let operation = args.operation.clone();
1604        let mode = args.mode.clone();
1605        let min_oc = args.min_output_chars;
1606        let max_oc = args.max_output_chars;
1607        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1608
1609        struct WorkerResult {
1610            completed: usize,
1611            failed: usize,
1612            skipped: usize,
1613            cost: f64,
1614            oauth: bool,
1615        }
1616
1617        let results: Vec<WorkerResult> = std::thread::scope(|s| {
1618            let handles: Vec<_> = (0..parallelism)
1619                .map(|worker_id| {
1620                    let stdout_mu = &stdout_mu;
1621                    let paths = &paths;
1622                    let namespace = &namespace;
1623                    let provider_binary = &provider_binary;
1624                    let operation = &operation;
1625                    let mode = &mode;
1626                    let prompt_tpl = prompt_tpl.as_deref();
1627                    s.spawn(move || {
1628                        let w_conn = match open_rw(&paths.db) {
1629                            Ok(c) => c,
1630                            Err(e) => {
1631                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1632                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1633                            }
1634                        };
1635                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1636                            Ok(c) => c,
1637                            Err(e) => {
1638                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1639                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1640                            }
1641                        };
1642                        let mut w_completed = 0usize;
1643                        let mut w_failed = 0usize;
1644                        let mut w_skipped = 0usize;
1645                        let mut w_cost = 0.0f64;
1646                        let mut w_oauth = false;
1647                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1648                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1649                        // G28-D: per-worker circuit breaker that aborts the
1650                        // loop after `circuit_breaker_threshold` consecutive
1651                        // HardFailure outcomes (transient/rate-limited errors
1652                        // do NOT count, so a recovering provider is not
1653                        // penalised).
1654                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1655                            args.circuit_breaker_threshold.max(1),
1656                            std::time::Duration::from_secs(60),
1657                        );
1658
1659                        loop {
1660                            if crate::shutdown_requested() {
1661                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1662                                break;
1663                            }
1664                            if let Some(b) = budget {
1665                                if !w_oauth && w_cost >= b {
1666                                    break;
1667                                }
1668                            }
1669                            let pending: Option<(i64, String, String)> = w_queue
1670                                .query_row(
1671                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1672                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1673                                     RETURNING id, item_key, item_type",
1674                                    [],
1675                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1676                                )
1677                                .ok();
1678                            let (queue_id, item_key, _item_type) = match pending {
1679                                Some(p) => p,
1680                                None => break,
1681                            };
1682                            let item_started = Instant::now();
1683                            let current_index = w_completed + w_failed + w_skipped;
1684
1685                            let call_result = match operation {
1686                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1687                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1688                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths),
1689                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1690                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1691                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1692                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1693                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1694                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1695                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1696                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1697                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1698                            };
1699
1700                            match call_result {
1701                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1702                                    if is_oauth { w_oauth = true; }
1703                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1704                                    let _ = w_queue.execute(
1705                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1706                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1707                                    );
1708                                    w_completed += 1;
1709                                    if !is_oauth { w_cost += cost; }
1710                                    // G28-D: count success; resets breaker.
1711                                    let _ = w_breaker
1712                                        .record(crate::retry::AttemptOutcome::Success);
1713                                    let _guard = stdout_mu.lock();
1714                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1715                                }
1716                                Ok(EnrichItemResult::Skipped { reason }) => {
1717                                    w_skipped += 1;
1718                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1719                                    let _guard = stdout_mu.lock();
1720                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1721                                }
1722                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1723                                    // G29 Passo 4: worker mirror of the
1724                                    // serial path. Counted as a soft
1725                                    // skip so the queue surface shows
1726                                    // a quality issue rather than a
1727                                    // transport failure.
1728                                    w_skipped += 1;
1729                                    let reason = format!(
1730                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1731                                    );
1732                                    let _ = w_queue.execute(
1733                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1734                                        rusqlite::params![reason, queue_id],
1735                                    );
1736                                    let _guard = stdout_mu.lock();
1737                                    emit_json(&ItemEvent {
1738                                        item: &item_key,
1739                                        status: "preservation_failed",
1740                                        memory_id: None,
1741                                        entity_id: None,
1742                                        entities: None,
1743                                        rels: None,
1744                                        chars_before: Some(chars_before),
1745                                        chars_after: Some(chars_after),
1746                                        cost_usd: None,
1747                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1748                                        error: Some(reason),
1749                                        index: current_index,
1750                                        total,
1751                                    });
1752                                }
1753                                Err(e) => {
1754                                    let err_str = format!("{e}");
1755                                    if matches!(e, AppError::RateLimited { .. }) {
1756                                        if crate::retry::is_kill_switch_active() {
1757                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1758                                        } else if std::time::Instant::now() >= w_deadline {
1759                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1760                                        } else {
1761                                            let half = w_backoff / 2;
1762                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1763                                            let actual_wait = half + jitter;
1764                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1765                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1766                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1767                                            w_backoff = (w_backoff * 2).min(900);
1768                                            continue;
1769                                        }
1770                                    }
1771                                    w_failed += 1;
1772                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1773                                    let _guard = stdout_mu.lock();
1774                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1775                                    // G28-D: count hard failure against breaker.
1776                                    let breaker_opened = w_breaker
1777                                        .record(crate::retry::AttemptOutcome::HardFailure);
1778                                    if breaker_opened {
1779                                        tracing::error!(target: "enrich",
1780                                            consecutive_failures = w_breaker.consecutive_failures(),
1781                                            "circuit breaker opened — aborting worker"
1782                                        );
1783                                        break;
1784                                    }
1785                                }
1786                            }
1787                        }
1788                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1789                    })
1790                })
1791                .collect();
1792            handles
1793                .into_iter()
1794                .map(|h| {
1795                    h.join().unwrap_or(WorkerResult {
1796                        completed: 0,
1797                        failed: 0,
1798                        skipped: 0,
1799                        cost: 0.0,
1800                        oauth: false,
1801                    })
1802                })
1803                .collect()
1804        });
1805
1806        for r in &results {
1807            completed += r.completed;
1808            failed += r.failed;
1809            skipped += r.skipped;
1810            cost_total += r.cost;
1811            oauth_detected |= r.oauth;
1812        }
1813    } else {
1814        // Serial path (parallelism == 1) — original loop
1815        loop {
1816            if crate::shutdown_requested() {
1817                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1818                break;
1819            }
1820
1821            // Budget check
1822            if let Some(budget) = args.max_cost_usd {
1823                if !oauth_detected && cost_total >= budget {
1824                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1825                    break;
1826                }
1827            }
1828
1829            // Dequeue next pending item
1830            let pending: Option<(i64, String, String)> = queue_conn
1831                .query_row(
1832                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1833                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1834                 RETURNING id, item_key, item_type",
1835                    [],
1836                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1837                )
1838                .ok();
1839
1840            let (queue_id, item_key, item_type) = match pending {
1841                Some(p) => p,
1842                None => break,
1843            };
1844
1845            let item_started = Instant::now();
1846            let current_index = completed + failed + skipped;
1847
1848            let call_result = match args.operation {
1849                EnrichOperation::MemoryBindings => call_memory_bindings(
1850                    &conn,
1851                    &namespace,
1852                    &item_key,
1853                    &provider_binary,
1854                    provider_model,
1855                    provider_timeout,
1856                    &args.mode,
1857                ),
1858                EnrichOperation::EntityDescriptions => call_entity_description(
1859                    &conn,
1860                    &namespace,
1861                    &item_key,
1862                    &provider_binary,
1863                    provider_model,
1864                    provider_timeout,
1865                    &args.mode,
1866                ),
1867                EnrichOperation::BodyEnrich => call_body_enrich(
1868                    &conn,
1869                    &namespace,
1870                    &item_key,
1871                    &provider_binary,
1872                    provider_model,
1873                    provider_timeout,
1874                    &args.mode,
1875                    args.min_output_chars,
1876                    args.max_output_chars,
1877                    args.prompt_template.as_deref(),
1878                    args.preserve_threshold,
1879                    &paths,
1880                ),
1881                EnrichOperation::WeightCalibrate => call_weight_calibrate(
1882                    &conn,
1883                    &namespace,
1884                    &item_key,
1885                    &provider_binary,
1886                    provider_model,
1887                    provider_timeout,
1888                    &args.mode,
1889                ),
1890                EnrichOperation::RelationReclassify => call_relation_reclassify(
1891                    &conn,
1892                    &namespace,
1893                    &item_key,
1894                    &provider_binary,
1895                    provider_model,
1896                    provider_timeout,
1897                    &args.mode,
1898                ),
1899                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1900                    call_entity_connect(
1901                        &conn,
1902                        &namespace,
1903                        &item_key,
1904                        &provider_binary,
1905                        provider_model,
1906                        provider_timeout,
1907                        &args.mode,
1908                    )
1909                }
1910                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
1911                    &conn,
1912                    &namespace,
1913                    &item_key,
1914                    &provider_binary,
1915                    provider_model,
1916                    provider_timeout,
1917                    &args.mode,
1918                ),
1919                EnrichOperation::DescriptionEnrich => call_description_enrich(
1920                    &conn,
1921                    &namespace,
1922                    &item_key,
1923                    &provider_binary,
1924                    provider_model,
1925                    provider_timeout,
1926                    &args.mode,
1927                ),
1928                EnrichOperation::DomainClassify => call_domain_classify(
1929                    &conn,
1930                    &namespace,
1931                    &item_key,
1932                    &provider_binary,
1933                    provider_model,
1934                    provider_timeout,
1935                    &args.mode,
1936                ),
1937                EnrichOperation::GraphAudit => call_graph_audit(
1938                    &conn,
1939                    &namespace,
1940                    &item_key,
1941                    &provider_binary,
1942                    provider_model,
1943                    provider_timeout,
1944                    &args.mode,
1945                ),
1946                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
1947                    &conn,
1948                    &namespace,
1949                    &item_key,
1950                    &provider_binary,
1951                    provider_model,
1952                    provider_timeout,
1953                    &args.mode,
1954                ),
1955                EnrichOperation::BodyExtract => call_body_extract(
1956                    &conn,
1957                    &namespace,
1958                    &item_key,
1959                    &provider_binary,
1960                    provider_model,
1961                    provider_timeout,
1962                    &args.mode,
1963                ),
1964            };
1965
1966            match call_result {
1967                Ok(EnrichItemResult::Done {
1968                    memory_id,
1969                    entity_id,
1970                    entities,
1971                    rels,
1972                    chars_before,
1973                    chars_after,
1974                    cost,
1975                    is_oauth,
1976                }) => {
1977                    if is_oauth && !oauth_detected {
1978                        oauth_detected = true;
1979                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
1980                    }
1981                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1982
1983                    // Persist depends on the operation
1984                    let persist_err: Option<String> = match args.operation {
1985                        EnrichOperation::MemoryBindings => {
1986                            // Bindings already persisted inside call_memory_bindings
1987                            None
1988                        }
1989                        EnrichOperation::EntityDescriptions => {
1990                            // Description already persisted inside call_entity_description
1991                            None
1992                        }
1993                        EnrichOperation::BodyEnrich => {
1994                            // Body already persisted inside call_body_enrich
1995                            None
1996                        }
1997                        _ => {
1998                            // All G27 operations persist inside their call_* function
1999                            None
2000                        }
2001                    };
2002
2003                    if let Err(e) = queue_conn.execute(
2004                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2005                    rusqlite::params![
2006                        memory_id,
2007                        entity_id,
2008                        entities as i64,
2009                        rels as i64,
2010                        cost,
2011                        item_started.elapsed().as_millis() as i64,
2012                        queue_id
2013                    ],
2014                ) {
2015                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2016                    }
2017
2018                    if persist_err.is_none() {
2019                        completed += 1;
2020                        if !is_oauth {
2021                            cost_total += cost;
2022                        }
2023                        emit_json(&ItemEvent {
2024                            item: &item_key,
2025                            status: "done",
2026                            memory_id,
2027                            entity_id,
2028                            entities: Some(entities),
2029                            rels: Some(rels),
2030                            chars_before,
2031                            chars_after,
2032                            cost_usd: if is_oauth { None } else { Some(cost) },
2033                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2034                            error: None,
2035                            index: current_index,
2036                            total,
2037                        });
2038                    } else {
2039                        failed += 1;
2040                        emit_json(&ItemEvent {
2041                            item: &item_key,
2042                            status: "failed",
2043                            memory_id: None,
2044                            entity_id: None,
2045                            entities: None,
2046                            rels: None,
2047                            chars_before: None,
2048                            chars_after: None,
2049                            cost_usd: None,
2050                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2051                            error: persist_err,
2052                            index: current_index,
2053                            total,
2054                        });
2055                    }
2056                }
2057                Ok(EnrichItemResult::Skipped { reason }) => {
2058                    skipped += 1;
2059                    if let Err(e) = queue_conn.execute(
2060                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2061                    rusqlite::params![reason, queue_id],
2062                ) {
2063                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2064                    }
2065                    emit_json(&ItemEvent {
2066                        item: &item_key,
2067                        status: "skipped",
2068                        memory_id: None,
2069                        entity_id: None,
2070                        entities: None,
2071                        rels: None,
2072                        chars_before: None,
2073                        chars_after: None,
2074                        cost_usd: None,
2075                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2076                        error: None,
2077                        index: current_index,
2078                        total,
2079                    });
2080                }
2081                Ok(EnrichItemResult::PreservationFailed {
2082                    score,
2083                    threshold,
2084                    chars_before,
2085                    chars_after,
2086                }) => {
2087                    // G29 Passo 4: the LLM rewrite diverged too far from
2088                    // the original body. Count as a soft failure (not
2089                    // `failed`) so the queue surfaces it as a quality
2090                    // issue, not a transport error. The reason is
2091                    // structured so the operator can audit why a body
2092                    // was rejected.
2093                    skipped += 1;
2094                    let reason = format!(
2095                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2096                    );
2097                    if let Err(qe) = queue_conn.execute(
2098                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2099                        rusqlite::params![reason, queue_id],
2100                    ) {
2101                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2102                    }
2103                    emit_json(&ItemEvent {
2104                        item: &item_key,
2105                        status: "preservation_failed",
2106                        memory_id: None,
2107                        entity_id: None,
2108                        entities: None,
2109                        rels: None,
2110                        chars_before: Some(chars_before),
2111                        chars_after: Some(chars_after),
2112                        cost_usd: None,
2113                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2114                        error: Some(reason),
2115                        index: current_index,
2116                        total,
2117                    });
2118                }
2119                Err(e) => {
2120                    let err_str = format!("{e}");
2121                    if matches!(e, AppError::RateLimited { .. }) {
2122                        if crate::retry::is_kill_switch_active() {
2123                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2124                        } else if std::time::Instant::now() >= rate_limit_deadline {
2125                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2126                        } else {
2127                            let half = backoff_secs / 2;
2128                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2129                            let actual_wait = half + jitter;
2130                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2131                            if let Err(qe) = queue_conn.execute(
2132                                "UPDATE queue SET status='pending' WHERE id=?1",
2133                                rusqlite::params![queue_id],
2134                            ) {
2135                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2136                            }
2137                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2138                            backoff_secs = (backoff_secs * 2).min(900);
2139                            continue;
2140                        }
2141                    }
2142
2143                    failed += 1;
2144                    if let Err(qe) = queue_conn.execute(
2145                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2146                    rusqlite::params![err_str, queue_id],
2147                ) {
2148                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2149                    }
2150                    emit_json(&ItemEvent {
2151                        item: &item_key,
2152                        status: "failed",
2153                        memory_id: None,
2154                        entity_id: None,
2155                        entities: None,
2156                        rels: None,
2157                        chars_before: None,
2158                        chars_after: None,
2159                        cost_usd: None,
2160                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2161                        error: Some(err_str),
2162                        index: current_index,
2163                        total,
2164                    });
2165                }
2166            }
2167
2168            let _ = item_type; // used via queue schema only
2169        }
2170    } // end else (serial path)
2171
2172    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2173    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2174
2175    emit_json(&EnrichSummary {
2176        summary: true,
2177        operation: format!("{:?}", args.operation),
2178        items_total: total,
2179        completed,
2180        failed,
2181        skipped,
2182        cost_usd: cost_total,
2183        elapsed_ms: started.elapsed().as_millis() as u64,
2184    });
2185
2186    if failed == 0 {
2187        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2188    }
2189
2190    Ok(())
2191}
2192
2193// ---------------------------------------------------------------------------
2194// Internal result type for a single item call
2195// ---------------------------------------------------------------------------
2196
2197enum EnrichItemResult {
2198    Done {
2199        memory_id: Option<i64>,
2200        entity_id: Option<i64>,
2201        entities: usize,
2202        rels: usize,
2203        chars_before: Option<usize>,
2204        chars_after: Option<usize>,
2205        cost: f64,
2206        is_oauth: bool,
2207    },
2208    Skipped {
2209        reason: String,
2210    },
2211    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2212    /// body beyond the configured `--preserve-threshold` and was rejected
2213    /// before persistence. The trigram-Jaccard score and threshold are
2214    /// emitted in the NDJSON stream for operator audit.
2215    PreservationFailed {
2216        score: f64,
2217        threshold: f64,
2218        chars_before: usize,
2219        chars_after: usize,
2220    },
2221}
2222
2223// ---------------------------------------------------------------------------
2224// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2225// ---------------------------------------------------------------------------
2226
2227fn call_memory_bindings(
2228    conn: &Connection,
2229    namespace: &str,
2230    memory_name: &str,
2231    binary: &Path,
2232    model: Option<&str>,
2233    timeout: u64,
2234    mode: &EnrichMode,
2235) -> Result<EnrichItemResult, AppError> {
2236    // Look up the memory
2237    let (memory_id, body): (i64, String) = conn.query_row(
2238        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2239        rusqlite::params![namespace, memory_name],
2240        |r| Ok((r.get(0)?, r.get(1)?)),
2241    ).map_err(|e| match e {
2242        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2243        other => AppError::Database(other),
2244    })?;
2245
2246    if body.trim().is_empty() {
2247        return Ok(EnrichItemResult::Skipped {
2248            reason: "body is empty".to_string(),
2249        });
2250    }
2251
2252    let (value, cost, is_oauth) = match mode {
2253        EnrichMode::ClaudeCode => call_claude(
2254            binary,
2255            BINDINGS_PROMPT,
2256            BINDINGS_SCHEMA,
2257            &body,
2258            model,
2259            timeout,
2260        )?,
2261        EnrichMode::Codex => call_codex(
2262            binary,
2263            BINDINGS_PROMPT,
2264            BINDINGS_SCHEMA,
2265            &body,
2266            model,
2267            timeout,
2268        )?,
2269    };
2270
2271    let empty_arr = serde_json::Value::Array(vec![]);
2272    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2273    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2274
2275    let (ent_count, rel_count) =
2276        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2277
2278    Ok(EnrichItemResult::Done {
2279        memory_id: Some(memory_id),
2280        entity_id: None,
2281        entities: ent_count,
2282        rels: rel_count,
2283        chars_before: None,
2284        chars_after: None,
2285        cost,
2286        is_oauth,
2287    })
2288}
2289
2290fn call_entity_description(
2291    conn: &Connection,
2292    namespace: &str,
2293    entity_name: &str,
2294    binary: &Path,
2295    model: Option<&str>,
2296    timeout: u64,
2297    mode: &EnrichMode,
2298) -> Result<EnrichItemResult, AppError> {
2299    let (entity_id, entity_type): (i64, String) = conn
2300        .query_row(
2301            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2302            rusqlite::params![namespace, entity_name],
2303            |r| Ok((r.get(0)?, r.get(1)?)),
2304        )
2305        .map_err(|e| match e {
2306            rusqlite::Error::QueryReturnedNoRows => {
2307                AppError::NotFound(format!("entity '{entity_name}' not found"))
2308            }
2309            other => AppError::Database(other),
2310        })?;
2311
2312    let prompt = format!(
2313        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2314    );
2315
2316    let (value, cost, is_oauth) = match mode {
2317        EnrichMode::ClaudeCode => call_claude(
2318            binary,
2319            &prompt,
2320            ENTITY_DESCRIPTION_SCHEMA,
2321            "",
2322            model,
2323            timeout,
2324        )?,
2325        EnrichMode::Codex => call_codex(
2326            binary,
2327            &prompt,
2328            ENTITY_DESCRIPTION_SCHEMA,
2329            "",
2330            model,
2331            timeout,
2332        )?,
2333    };
2334
2335    let description = value
2336        .get("description")
2337        .and_then(|v| v.as_str())
2338        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2339
2340    persist_entity_description(conn, entity_id, description)?;
2341
2342    Ok(EnrichItemResult::Done {
2343        memory_id: None,
2344        entity_id: Some(entity_id),
2345        entities: 0,
2346        rels: 0,
2347        chars_before: None,
2348        chars_after: None,
2349        cost,
2350        is_oauth,
2351    })
2352}
2353
2354#[allow(clippy::too_many_arguments)]
2355fn call_body_enrich(
2356    conn: &Connection,
2357    namespace: &str,
2358    memory_name: &str,
2359    binary: &Path,
2360    model: Option<&str>,
2361    timeout: u64,
2362    mode: &EnrichMode,
2363    min_output_chars: usize,
2364    max_output_chars: usize,
2365    prompt_template: Option<&Path>,
2366    preserve_threshold: f64,
2367    paths: &crate::paths::AppPaths,
2368) -> Result<EnrichItemResult, AppError> {
2369    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2370        .query_row(
2371            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2372         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2373            rusqlite::params![namespace, memory_name],
2374            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2375        )
2376        .map_err(|e| match e {
2377            rusqlite::Error::QueryReturnedNoRows => {
2378                AppError::NotFound(format!("memory '{memory_name}' not found"))
2379            }
2380            other => AppError::Database(other),
2381        })?;
2382
2383    let chars_before = body.chars().count();
2384
2385    // G26: gather graph context for contextualized enrichment
2386    let linked_entities: Vec<String> = {
2387        let mut stmt = conn.prepare_cached(
2388            "SELECT e.name FROM memory_entities me \
2389             JOIN entities e ON e.id = me.entity_id \
2390             WHERE me.memory_id = ?1 LIMIT 10",
2391        )?;
2392        let result: Vec<String> = stmt
2393            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2394            .filter_map(|r| r.ok())
2395            .collect();
2396        drop(stmt);
2397        result
2398    };
2399
2400    // Load custom prompt template if provided
2401    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2402        let file_size = std::fs::metadata(tmpl_path)
2403            .map_err(|e| {
2404                AppError::Io(std::io::Error::new(
2405                    e.kind(),
2406                    format!("failed to stat prompt template: {e}"),
2407                ))
2408            })?
2409            .len();
2410        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2411            return Err(AppError::LimitExceeded(
2412                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2413            ));
2414        }
2415        std::fs::read_to_string(tmpl_path).map_err(|e| {
2416            AppError::Io(std::io::Error::new(
2417                e.kind(),
2418                format!("failed to read prompt template: {e}"),
2419            ))
2420        })?
2421    } else {
2422        BODY_ENRICH_PROMPT_PREFIX.to_string()
2423    };
2424
2425    // G26: build contextualized prompt with graph data
2426    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2427        let mut ctx = String::new();
2428        ctx.push_str(&format!(
2429            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2430        ));
2431        if !description.is_empty() {
2432            ctx.push_str(&format!("- Description: {description}\n"));
2433        }
2434        ctx.push_str(&format!("- Domain: {namespace}\n"));
2435        if !linked_entities.is_empty() {
2436            ctx.push_str(&format!(
2437                "- Linked entities: {}\n",
2438                linked_entities.join(", ")
2439            ));
2440        }
2441        ctx
2442    } else {
2443        String::new()
2444    };
2445
2446    let prompt = format!(
2447        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2448    );
2449
2450    // The body schema uses a free-form enriched_body field
2451    let (value, cost, is_oauth) = match mode {
2452        EnrichMode::ClaudeCode => {
2453            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2454        }
2455        EnrichMode::Codex => {
2456            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2457        }
2458    };
2459
2460    let enriched_body = value
2461        .get("enriched_body")
2462        .and_then(|v| v.as_str())
2463        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2464
2465    let chars_after = enriched_body.chars().count();
2466
2467    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2468    // a trigram-Jaccard similarity between the original body and the
2469    // LLM-rewritten body. When the score falls below
2470    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2471    // rewrite as a likely hallucination. The result is recorded in the
2472    // NDJSON stream so operators can audit what the LLM tried to do.
2473    let threshold = preserve_threshold;
2474    let verdict =
2475        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2476    if !verdict.is_accepted() {
2477        return Ok(EnrichItemResult::PreservationFailed {
2478            score: match verdict {
2479                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2480                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2481                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2482            },
2483            threshold,
2484            chars_before,
2485            chars_after,
2486        });
2487    }
2488
2489    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2490    // compare the hash of the original body against the hash of the enriched
2491    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2492    // body (rare but possible) — treat as `Skipped` so re-running the batch
2493    // is safe and the queue does not get re-persisted entries.
2494    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2495    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2496    if old_hash == new_hash {
2497        return Ok(EnrichItemResult::Skipped {
2498            reason: format!(
2499                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2500            ),
2501        });
2502    }
2503
2504    // Only persist if the enriched body is genuinely longer
2505    if chars_after <= chars_before {
2506        return Ok(EnrichItemResult::Skipped {
2507            reason: format!(
2508                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2509            ),
2510        });
2511    }
2512
2513    persist_enriched_body(
2514        conn,
2515        namespace,
2516        memory_id,
2517        memory_name,
2518        enriched_body,
2519        paths,
2520    )?;
2521
2522    Ok(EnrichItemResult::Done {
2523        memory_id: Some(memory_id),
2524        entity_id: None,
2525        entities: 0,
2526        rels: 0,
2527        chars_before: Some(chars_before),
2528        chars_after: Some(chars_after),
2529        cost,
2530        is_oauth,
2531    })
2532}
2533
2534// ---------------------------------------------------------------------------
2535// Scan dispatcher — maps operation to scan query result (item keys)
2536// ---------------------------------------------------------------------------
2537
2538fn scan_operation(
2539    conn: &Connection,
2540    namespace: &str,
2541    args: &EnrichArgs,
2542) -> Result<Vec<String>, AppError> {
2543    // G37: resolve --names + --names-file once and apply to every scan path.
2544    let name_filter = resolve_name_filter(args)?;
2545    match args.operation {
2546        EnrichOperation::MemoryBindings => {
2547            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2548            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2549        }
2550        EnrichOperation::EntityDescriptions => {
2551            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2552            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2553        }
2554        EnrichOperation::BodyEnrich => {
2555            let rows =
2556                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2557            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2558        }
2559        EnrichOperation::WeightCalibrate => {
2560            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2561            Ok(rows
2562                .into_iter()
2563                .map(|(id, _, _, _, _)| id.to_string())
2564                .collect())
2565        }
2566        EnrichOperation::RelationReclassify => {
2567            let rows = scan_generic_relations(conn, namespace, args.limit)?;
2568            Ok(rows
2569                .into_iter()
2570                .map(|(id, _, _, _)| id.to_string())
2571                .collect())
2572        }
2573        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2574            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2575            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2576        }
2577        EnrichOperation::EntityTypeValidate => {
2578            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2579            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2580        }
2581        EnrichOperation::DescriptionEnrich => {
2582            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2583            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2584        }
2585        EnrichOperation::DomainClassify
2586        | EnrichOperation::GraphAudit
2587        | EnrichOperation::DeepResearchSynth
2588        | EnrichOperation::BodyExtract => {
2589            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2590            let sql = format!(
2591                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2592            );
2593            let mut stmt = conn.prepare(&sql)?;
2594            let names = stmt
2595                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2596                .collect::<Result<Vec<_>, _>>()?;
2597            Ok(names)
2598        }
2599    }
2600}
2601
2602// ---------------------------------------------------------------------------
2603// Codex stub provider
2604// ---------------------------------------------------------------------------
2605
2606/// Locates the Codex CLI binary.
2607fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2608    if let Some(p) = explicit {
2609        if p.exists() {
2610            return Ok(p.to_path_buf());
2611        }
2612        return Err(AppError::Validation(format!(
2613            "Codex binary not found at explicit path: {}",
2614            p.display()
2615        )));
2616    }
2617
2618    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2619        let p = PathBuf::from(&env_path);
2620        if p.exists() {
2621            return Ok(p);
2622        }
2623    }
2624
2625    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2626    if let Some(path_var) = std::env::var_os("PATH") {
2627        for dir in std::env::split_paths(&path_var) {
2628            let candidate = dir.join(name);
2629            if candidate.exists() {
2630                return Ok(candidate);
2631            }
2632        }
2633    }
2634
2635    Err(AppError::Validation(
2636        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2637    ))
2638}
2639
2640/// G27: Calibrate weight of a single relationship via LLM.
2641fn call_weight_calibrate(
2642    conn: &Connection,
2643    _namespace: &str,
2644    item_key: &str,
2645    binary: &Path,
2646    model: Option<&str>,
2647    timeout: u64,
2648    mode: &EnrichMode,
2649) -> Result<EnrichItemResult, AppError> {
2650    let rel_id: i64 = item_key
2651        .parse()
2652        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2653    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2654        .query_row(
2655            "SELECT e1.name, e2.name, r.relation, r.weight \
2656             FROM relationships r \
2657             JOIN entities e1 ON e1.id = r.source_id \
2658             JOIN entities e2 ON e2.id = r.target_id \
2659             WHERE r.id = ?1",
2660            rusqlite::params![rel_id],
2661            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2662        )
2663        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2664
2665    let input_text = format!(
2666        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2667    );
2668    let (value, cost, is_oauth) = match mode {
2669        EnrichMode::ClaudeCode => call_claude(
2670            binary,
2671            WEIGHT_CALIBRATE_PROMPT,
2672            WEIGHT_CALIBRATE_SCHEMA,
2673            &input_text,
2674            model,
2675            timeout,
2676        )?,
2677        EnrichMode::Codex => call_codex(
2678            binary,
2679            WEIGHT_CALIBRATE_PROMPT,
2680            WEIGHT_CALIBRATE_SCHEMA,
2681            &input_text,
2682            model,
2683            timeout,
2684        )?,
2685    };
2686
2687    let calibrated = value
2688        .get("calibrated_weight")
2689        .and_then(|v| v.as_f64())
2690        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2691
2692    conn.execute(
2693        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2694        rusqlite::params![calibrated, rel_id],
2695    )?;
2696
2697    Ok(EnrichItemResult::Done {
2698        memory_id: None,
2699        entity_id: None,
2700        entities: 0,
2701        rels: 1,
2702        chars_before: None,
2703        chars_after: None,
2704        cost,
2705        is_oauth,
2706    })
2707}
2708
2709/// G27: Reclassify a generic relationship type via LLM.
2710fn call_relation_reclassify(
2711    conn: &Connection,
2712    _namespace: &str,
2713    item_key: &str,
2714    binary: &Path,
2715    model: Option<&str>,
2716    timeout: u64,
2717    mode: &EnrichMode,
2718) -> Result<EnrichItemResult, AppError> {
2719    let rel_id: i64 = item_key
2720        .parse()
2721        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2722    let (source_name, target_name, current_relation): (String, String, String) = conn
2723        .query_row(
2724            "SELECT e1.name, e2.name, r.relation \
2725             FROM relationships r \
2726             JOIN entities e1 ON e1.id = r.source_id \
2727             JOIN entities e2 ON e2.id = r.target_id \
2728             WHERE r.id = ?1",
2729            rusqlite::params![rel_id],
2730            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2731        )
2732        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2733
2734    let input_text = format!(
2735        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2736    );
2737    let (value, cost, is_oauth) = match mode {
2738        EnrichMode::ClaudeCode => call_claude(
2739            binary,
2740            RELATION_RECLASSIFY_PROMPT,
2741            RELATION_RECLASSIFY_SCHEMA,
2742            &input_text,
2743            model,
2744            timeout,
2745        )?,
2746        EnrichMode::Codex => call_codex(
2747            binary,
2748            RELATION_RECLASSIFY_PROMPT,
2749            RELATION_RECLASSIFY_SCHEMA,
2750            &input_text,
2751            model,
2752            timeout,
2753        )?,
2754    };
2755
2756    let new_relation = value
2757        .get("relation")
2758        .and_then(|v| v.as_str())
2759        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2760    let new_strength = value
2761        .get("strength")
2762        .and_then(|v| v.as_f64())
2763        .unwrap_or(0.5);
2764
2765    conn.execute(
2766        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2767        rusqlite::params![new_relation, new_strength, rel_id],
2768    )?;
2769
2770    Ok(EnrichItemResult::Done {
2771        memory_id: None,
2772        entity_id: None,
2773        entities: 0,
2774        rels: 1,
2775        chars_before: None,
2776        chars_after: None,
2777        cost,
2778        is_oauth,
2779    })
2780}
2781
2782/// G27 P2: Connect isolated entities via LLM-suggested relationship.
2783fn call_entity_connect(
2784    conn: &Connection,
2785    namespace: &str,
2786    item_key: &str,
2787    binary: &Path,
2788    model: Option<&str>,
2789    timeout: u64,
2790    mode: &EnrichMode,
2791) -> Result<EnrichItemResult, AppError> {
2792    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
2793    let (e1_id, e1_name, e2_id, e2_name) =
2794        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
2795            Some(p) => p,
2796            None => {
2797                return Ok(EnrichItemResult::Skipped {
2798                    reason: "pair no longer isolated".into(),
2799                })
2800            }
2801        };
2802    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
2803    let (value, cost, is_oauth) = match mode {
2804        EnrichMode::ClaudeCode => call_claude(
2805            binary,
2806            ENTITY_CONNECT_PROMPT,
2807            ENTITY_CONNECT_SCHEMA,
2808            &input_text,
2809            model,
2810            timeout,
2811        )?,
2812        EnrichMode::Codex => call_codex(
2813            binary,
2814            ENTITY_CONNECT_PROMPT,
2815            ENTITY_CONNECT_SCHEMA,
2816            &input_text,
2817            model,
2818            timeout,
2819        )?,
2820    };
2821    let relation = value
2822        .get("relation")
2823        .and_then(|v| v.as_str())
2824        .unwrap_or("none");
2825    if relation == "none" {
2826        return Ok(EnrichItemResult::Skipped {
2827            reason: "LLM determined no relationship".into(),
2828        });
2829    }
2830    let strength = value
2831        .get("strength")
2832        .and_then(|v| v.as_f64())
2833        .unwrap_or(0.5);
2834    conn.execute(
2835        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
2836        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
2837    )?;
2838    Ok(EnrichItemResult::Done {
2839        memory_id: None,
2840        entity_id: None,
2841        entities: 0,
2842        rels: 1,
2843        chars_before: None,
2844        chars_after: None,
2845        cost,
2846        is_oauth,
2847    })
2848}
2849
2850/// G27 P2: Validate entity type assignment via LLM.
2851fn call_entity_type_validate(
2852    conn: &Connection,
2853    _namespace: &str,
2854    item_key: &str,
2855    binary: &Path,
2856    model: Option<&str>,
2857    timeout: u64,
2858    mode: &EnrichMode,
2859) -> Result<EnrichItemResult, AppError> {
2860    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
2861        .query_row(
2862            "SELECT id, name, type FROM entities WHERE name = ?1",
2863            rusqlite::params![item_key],
2864            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2865        )
2866        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
2867    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
2868    let (value, cost, is_oauth) = match mode {
2869        EnrichMode::ClaudeCode => call_claude(
2870            binary,
2871            ENTITY_TYPE_VALIDATE_PROMPT,
2872            ENTITY_TYPE_VALIDATE_SCHEMA,
2873            &input_text,
2874            model,
2875            timeout,
2876        )?,
2877        EnrichMode::Codex => call_codex(
2878            binary,
2879            ENTITY_TYPE_VALIDATE_PROMPT,
2880            ENTITY_TYPE_VALIDATE_SCHEMA,
2881            &input_text,
2882            model,
2883            timeout,
2884        )?,
2885    };
2886    let validated_type = value
2887        .get("validated_type")
2888        .and_then(|v| v.as_str())
2889        .unwrap_or(&ent_type);
2890    let was_correct = value
2891        .get("was_correct")
2892        .and_then(|v| v.as_bool())
2893        .unwrap_or(true);
2894    if !was_correct {
2895        conn.execute(
2896            "UPDATE entities SET type = ?1 WHERE id = ?2",
2897            rusqlite::params![validated_type, ent_id],
2898        )?;
2899    }
2900    Ok(EnrichItemResult::Done {
2901        memory_id: None,
2902        entity_id: Some(ent_id),
2903        entities: 1,
2904        rels: 0,
2905        chars_before: None,
2906        chars_after: None,
2907        cost,
2908        is_oauth,
2909    })
2910}
2911
2912/// G27 P2: Enrich generic memory description via LLM.
2913fn call_description_enrich(
2914    conn: &Connection,
2915    _namespace: &str,
2916    item_key: &str,
2917    binary: &Path,
2918    model: Option<&str>,
2919    timeout: u64,
2920    mode: &EnrichMode,
2921) -> Result<EnrichItemResult, AppError> {
2922    let (mem_id, body, old_desc): (i64, String, String) = conn
2923        .query_row(
2924            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2925            rusqlite::params![item_key],
2926            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2927        )
2928        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2929    let snippet: String = body.chars().take(500).collect();
2930    let input_text = format!(
2931        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
2932    );
2933    let (value, cost, is_oauth) = match mode {
2934        EnrichMode::ClaudeCode => call_claude(
2935            binary,
2936            DESCRIPTION_ENRICH_PROMPT,
2937            DESCRIPTION_ENRICH_SCHEMA,
2938            &input_text,
2939            model,
2940            timeout,
2941        )?,
2942        EnrichMode::Codex => call_codex(
2943            binary,
2944            DESCRIPTION_ENRICH_PROMPT,
2945            DESCRIPTION_ENRICH_SCHEMA,
2946            &input_text,
2947            model,
2948            timeout,
2949        )?,
2950    };
2951    let new_desc = value
2952        .get("description")
2953        .and_then(|v| v.as_str())
2954        .unwrap_or(&old_desc);
2955    conn.execute(
2956        "UPDATE memories SET description = ?1 WHERE id = ?2",
2957        rusqlite::params![new_desc, mem_id],
2958    )?;
2959    Ok(EnrichItemResult::Done {
2960        memory_id: Some(mem_id),
2961        entity_id: None,
2962        entities: 0,
2963        rels: 0,
2964        chars_before: Some(old_desc.len()),
2965        chars_after: Some(new_desc.len()),
2966        cost,
2967        is_oauth,
2968    })
2969}
2970
2971/// G27 P2: Classify memory into domain category via LLM.
2972fn call_domain_classify(
2973    conn: &Connection,
2974    _namespace: &str,
2975    item_key: &str,
2976    binary: &Path,
2977    model: Option<&str>,
2978    timeout: u64,
2979    mode: &EnrichMode,
2980) -> Result<EnrichItemResult, AppError> {
2981    let (mem_id, body, desc): (i64, String, String) = conn
2982        .query_row(
2983            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2984            rusqlite::params![item_key],
2985            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2986        )
2987        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2988    let snippet: String = body.chars().take(500).collect();
2989    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
2990    let (value, cost, is_oauth) = match mode {
2991        EnrichMode::ClaudeCode => call_claude(
2992            binary,
2993            DOMAIN_CLASSIFY_PROMPT,
2994            DOMAIN_CLASSIFY_SCHEMA,
2995            &input_text,
2996            model,
2997            timeout,
2998        )?,
2999        EnrichMode::Codex => call_codex(
3000            binary,
3001            DOMAIN_CLASSIFY_PROMPT,
3002            DOMAIN_CLASSIFY_SCHEMA,
3003            &input_text,
3004            model,
3005            timeout,
3006        )?,
3007    };
3008    let domain = value
3009        .get("domain")
3010        .and_then(|v| v.as_str())
3011        .unwrap_or("uncategorized");
3012    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3013    conn.execute(
3014        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3015        rusqlite::params![metadata, mem_id],
3016    )?;
3017    Ok(EnrichItemResult::Done {
3018        memory_id: Some(mem_id),
3019        entity_id: None,
3020        entities: 0,
3021        rels: 0,
3022        chars_before: None,
3023        chars_after: None,
3024        cost,
3025        is_oauth,
3026    })
3027}
3028
3029/// G27 P2: Audit memory graph quality via LLM.
3030fn call_graph_audit(
3031    conn: &Connection,
3032    _namespace: &str,
3033    item_key: &str,
3034    binary: &Path,
3035    model: Option<&str>,
3036    timeout: u64,
3037    mode: &EnrichMode,
3038) -> Result<EnrichItemResult, AppError> {
3039    let (mem_id, body, desc): (i64, String, String) = conn
3040        .query_row(
3041            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3042            rusqlite::params![item_key],
3043            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3044        )
3045        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3046    let snippet: String = body.chars().take(500).collect();
3047    let ent_count: i64 = conn
3048        .query_row(
3049            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3050            rusqlite::params![mem_id],
3051            |r| r.get(0),
3052        )
3053        .unwrap_or(0);
3054    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3055    let (value, cost, is_oauth) = match mode {
3056        EnrichMode::ClaudeCode => call_claude(
3057            binary,
3058            GRAPH_AUDIT_PROMPT,
3059            GRAPH_AUDIT_SCHEMA,
3060            &input_text,
3061            model,
3062            timeout,
3063        )?,
3064        EnrichMode::Codex => call_codex(
3065            binary,
3066            GRAPH_AUDIT_PROMPT,
3067            GRAPH_AUDIT_SCHEMA,
3068            &input_text,
3069            model,
3070            timeout,
3071        )?,
3072    };
3073    let issues = value
3074        .get("issues")
3075        .and_then(|v| v.as_array())
3076        .map(|a| a.len())
3077        .unwrap_or(0);
3078    Ok(EnrichItemResult::Done {
3079        memory_id: Some(mem_id),
3080        entity_id: None,
3081        entities: 0,
3082        rels: issues,
3083        chars_before: None,
3084        chars_after: None,
3085        cost,
3086        is_oauth,
3087    })
3088}
3089
3090/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3091fn call_deep_research_synth(
3092    conn: &Connection,
3093    namespace: &str,
3094    item_key: &str,
3095    binary: &Path,
3096    model: Option<&str>,
3097    timeout: u64,
3098    mode: &EnrichMode,
3099) -> Result<EnrichItemResult, AppError> {
3100    let (mem_id, body): (i64, String) = conn
3101        .query_row(
3102            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3103            rusqlite::params![item_key],
3104            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3105        )
3106        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3107    let snippet: String = body.chars().take(2000).collect();
3108    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3109    let (value, cost, is_oauth) = match mode {
3110        EnrichMode::ClaudeCode => call_claude(
3111            binary,
3112            DEEP_RESEARCH_SYNTH_PROMPT,
3113            DEEP_RESEARCH_SYNTH_SCHEMA,
3114            &input_text,
3115            model,
3116            timeout,
3117        )?,
3118        EnrichMode::Codex => call_codex(
3119            binary,
3120            DEEP_RESEARCH_SYNTH_PROMPT,
3121            DEEP_RESEARCH_SYNTH_SCHEMA,
3122            &input_text,
3123            model,
3124            timeout,
3125        )?,
3126    };
3127    let mut ent_count = 0usize;
3128    let mut rel_count = 0usize;
3129    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3130        for e in ents {
3131            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3132            let etype_str = e
3133                .get("entity_type")
3134                .and_then(|v| v.as_str())
3135                .unwrap_or("concept");
3136            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3137            if name.len() >= 2 {
3138                let ne = NewEntity {
3139                    name: name.to_string(),
3140                    entity_type: etype,
3141                    description: None,
3142                };
3143                let _ = entities::upsert_entity(conn, namespace, &ne);
3144                ent_count += 1;
3145            }
3146        }
3147    }
3148    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3149        for r in rels {
3150            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3151            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3152            if src.is_empty() || tgt.is_empty() {
3153                continue;
3154            }
3155            let rel = r
3156                .get("relation")
3157                .and_then(|v| v.as_str())
3158                .unwrap_or("related");
3159            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3160            if let (Some(sid), Some(tid)) = (
3161                entities::find_entity_id(conn, namespace, src)?,
3162                entities::find_entity_id(conn, namespace, tgt)?,
3163            ) {
3164                let _ = entities::create_or_fetch_relationship(
3165                    conn, namespace, sid, tid, rel, str_, None,
3166                );
3167                rel_count += 1;
3168            }
3169        }
3170    }
3171    Ok(EnrichItemResult::Done {
3172        memory_id: Some(mem_id),
3173        entity_id: None,
3174        entities: ent_count,
3175        rels: rel_count,
3176        chars_before: None,
3177        chars_after: None,
3178        cost,
3179        is_oauth,
3180    })
3181}
3182
3183/// G27 P2: Extract structured body from unstructured text via LLM.
3184fn call_body_extract(
3185    conn: &Connection,
3186    _namespace: &str,
3187    item_key: &str,
3188    binary: &Path,
3189    model: Option<&str>,
3190    timeout: u64,
3191    mode: &EnrichMode,
3192) -> Result<EnrichItemResult, AppError> {
3193    let (mem_id, body): (i64, String) = conn
3194        .query_row(
3195            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3196            rusqlite::params![item_key],
3197            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3198        )
3199        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3200    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3201    let (value, cost, is_oauth) = match mode {
3202        EnrichMode::ClaudeCode => call_claude(
3203            binary,
3204            BODY_EXTRACT_PROMPT,
3205            BODY_EXTRACT_SCHEMA,
3206            &input_text,
3207            model,
3208            timeout,
3209        )?,
3210        EnrichMode::Codex => call_codex(
3211            binary,
3212            BODY_EXTRACT_PROMPT,
3213            BODY_EXTRACT_SCHEMA,
3214            &input_text,
3215            model,
3216            timeout,
3217        )?,
3218    };
3219    let restructured = value
3220        .get("restructured_body")
3221        .and_then(|v| v.as_str())
3222        .unwrap_or(&body);
3223    let chars_before = body.len();
3224    let chars_after = restructured.len();
3225    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3226    conn.execute(
3227        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3228        rusqlite::params![restructured, new_hash, mem_id],
3229    )?;
3230    Ok(EnrichItemResult::Done {
3231        memory_id: Some(mem_id),
3232        entity_id: None,
3233        entities: 0,
3234        rels: 0,
3235        chars_before: Some(chars_before),
3236        chars_after: Some(chars_after),
3237        cost,
3238        is_oauth,
3239    })
3240}
3241
3242/// Scan for pairs of entities that share no direct relationship.
3243#[allow(clippy::type_complexity)]
3244fn scan_isolated_entity_pairs(
3245    conn: &Connection,
3246    namespace: &str,
3247    limit: Option<usize>,
3248) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3249    let limit_val = limit.unwrap_or(50) as i64;
3250    let mut stmt = conn.prepare_cached(
3251        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3252         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3253         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3254           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3255           (r.source_id = e2.id AND r.target_id = e1.id)) \
3256         LIMIT ?2",
3257    )?;
3258    let rows = stmt
3259        .query_map(rusqlite::params![namespace, limit_val], |r| {
3260            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3261        })?
3262        .collect::<Result<Vec<_>, _>>()?;
3263    Ok(rows)
3264}
3265
3266/// Scan for entities with non-validated types (all entities for type audit).
3267fn scan_entities_for_type_validation(
3268    conn: &Connection,
3269    namespace: &str,
3270    limit: Option<usize>,
3271) -> Result<Vec<(i64, String, String)>, AppError> {
3272    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3273    let sql = format!(
3274        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3275    );
3276    let mut stmt = conn.prepare(&sql)?;
3277    let rows = stmt
3278        .query_map(rusqlite::params![namespace], |r| {
3279            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3280        })?
3281        .collect::<Result<Vec<_>, _>>()?;
3282    Ok(rows)
3283}
3284
3285/// Scan for memories with generic descriptions (ingested, imported, etc).
3286fn scan_generic_descriptions(
3287    conn: &Connection,
3288    namespace: &str,
3289    limit: Option<usize>,
3290) -> Result<Vec<(i64, String, String)>, AppError> {
3291    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3292    let sql = format!(
3293        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3294         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3295         ORDER BY id {limit_clause}"
3296    );
3297    let mut stmt = conn.prepare(&sql)?;
3298    let rows = stmt
3299        .query_map(rusqlite::params![namespace], |r| {
3300            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3301        })?
3302        .collect::<Result<Vec<_>, _>>()?;
3303    Ok(rows)
3304}
3305
3306/// Calls the Codex CLI for a single enrichment item.
3307///
3308/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3309fn call_codex(
3310    binary: &Path,
3311    prompt: &str,
3312    json_schema: &str,
3313    input_text: &str,
3314    model: Option<&str>,
3315    timeout_secs: u64,
3316) -> Result<(serde_json::Value, f64, bool), AppError> {
3317    use wait_timeout::ChildExt;
3318
3319    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3320    // schema to a trusted cache path (not /tmp), and reuse the
3321    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3322    // hardening rationale.
3323    super::codex_spawn::validate_codex_model(model)?;
3324    let schema_file = super::codex_spawn::trusted_schema_path()?;
3325
3326    let args = super::codex_spawn::CodexSpawnArgs {
3327        binary,
3328        prompt,
3329        json_schema,
3330        input_text,
3331        model,
3332        timeout_secs,
3333        schema_path: schema_file.clone(),
3334    };
3335    let mut cmd = super::codex_spawn::build_codex_command(&args);
3336
3337    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3338        AppError::Io(std::io::Error::new(
3339            e.kind(),
3340            format!("failed to spawn codex: {e}"),
3341        ))
3342    })?;
3343
3344    let full_prompt = format!("{prompt}\n\n{input_text}");
3345    let stdin_bytes = full_prompt.into_bytes();
3346    let mut child_stdin = child
3347        .stdin
3348        .take()
3349        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3350    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3351        child_stdin.write_all(&stdin_bytes)?;
3352        drop(child_stdin);
3353        Ok(())
3354    });
3355
3356    let start = std::time::Instant::now();
3357    let timeout = std::time::Duration::from_secs(timeout_secs);
3358    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3359    let _ = std::fs::remove_file(&schema_file);
3360
3361    match status {
3362        Some(exit_status) => {
3363            stdin_thread
3364                .join()
3365                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3366                .map_err(AppError::Io)?;
3367
3368            tracing::debug!(
3369                target: "process",
3370                exit_code = ?exit_status.code(),
3371                elapsed_ms = start.elapsed().as_millis() as u64,
3372                "external process completed"
3373            );
3374
3375            let mut stdout_buf = Vec::new();
3376            if let Some(mut out) = child.stdout.take() {
3377                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3378            }
3379            if !exit_status.success() {
3380                let mut stderr_buf = Vec::new();
3381                if let Some(mut err) = child.stderr.take() {
3382                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3383                }
3384                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3385                tracing::warn!(
3386                    target: "enrich",
3387                    exit_code = ?exit_status.code(),
3388                    stderr = %stderr_str.trim(),
3389                    "codex process failed"
3390                );
3391                return Err(AppError::Validation(format!(
3392                    "codex exited with code {:?}: {}",
3393                    exit_status.code(),
3394                    stderr_str.trim()
3395                )));
3396            }
3397            let stdout_str = String::from_utf8(stdout_buf)
3398                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3399            // G32: use the JSONL parser, NOT serde_json::from_str on the
3400            // entire stdout (codex emits one event per line).
3401            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3402            // Wrap the extraction as a JSON object so downstream code
3403            // (which expects a single `serde_json::Value`) keeps working.
3404            // `ExtractedUrl` lacks `Serialize` so we project to a
3405            // serde-friendly vector.
3406            let urls: Vec<serde_json::Value> = result
3407                .extraction
3408                .urls
3409                .iter()
3410                .map(|u| serde_json::json!({"url": u.url, "offset": u.offset}))
3411                .collect();
3412            let value = serde_json::json!({
3413                "entities": result.extraction.entities,
3414                "relationships": result.extraction.relationships,
3415                "urls": urls,
3416                "extraction_method": result.extraction.extraction_method,
3417            });
3418            Ok((value, 0.0, false))
3419        }
3420        None => {
3421            let _ = child.kill();
3422            let _ = child.wait();
3423            let _ = stdin_thread.join();
3424            Err(AppError::Validation(format!(
3425                "codex timed out after {timeout_secs} seconds"
3426            )))
3427        }
3428    }
3429}
3430
3431// ---------------------------------------------------------------------------
3432// Tests
3433// ---------------------------------------------------------------------------
3434
3435#[cfg(test)]
3436mod tests {
3437    use super::*;
3438    use rusqlite::Connection;
3439
3440    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
3441    fn open_test_db() -> Connection {
3442        let conn = Connection::open_in_memory().expect("in-memory db");
3443        conn.execute_batch(
3444            "CREATE TABLE memories (
3445                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3446                namespace   TEXT NOT NULL DEFAULT 'global',
3447                name        TEXT NOT NULL,
3448                type        TEXT NOT NULL DEFAULT 'note',
3449                description TEXT NOT NULL DEFAULT '',
3450                body        TEXT NOT NULL DEFAULT '',
3451                body_hash   TEXT NOT NULL DEFAULT '',
3452                session_id  TEXT,
3453                source      TEXT NOT NULL DEFAULT 'agent',
3454                metadata    TEXT NOT NULL DEFAULT '{}',
3455                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3456                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3457                deleted_at  INTEGER,
3458                UNIQUE(namespace, name)
3459            );
3460            CREATE TABLE entities (
3461                id          INTEGER PRIMARY KEY AUTOINCREMENT,
3462                namespace   TEXT NOT NULL DEFAULT 'global',
3463                name        TEXT NOT NULL,
3464                type        TEXT NOT NULL DEFAULT 'concept',
3465                description TEXT,
3466                degree      INTEGER NOT NULL DEFAULT 0,
3467                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3468                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
3469                UNIQUE(namespace, name)
3470            );
3471            CREATE TABLE memory_entities (
3472                memory_id  INTEGER NOT NULL,
3473                entity_id  INTEGER NOT NULL,
3474                PRIMARY KEY (memory_id, entity_id)
3475            );
3476            CREATE TABLE relationships (
3477                id         INTEGER PRIMARY KEY AUTOINCREMENT,
3478                namespace  TEXT NOT NULL DEFAULT 'global',
3479                source_id  INTEGER NOT NULL,
3480                target_id  INTEGER NOT NULL,
3481                relation   TEXT NOT NULL,
3482                weight     REAL NOT NULL DEFAULT 0.5,
3483                description TEXT,
3484                UNIQUE(source_id, target_id, relation)
3485            );",
3486        )
3487        .expect("schema creation must succeed");
3488        conn
3489    }
3490
3491    #[test]
3492    fn scan_unbound_memories_finds_memories_without_bindings() {
3493        let conn = open_test_db();
3494        conn.execute(
3495            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3496            [],
3497        )
3498        .unwrap();
3499
3500        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3501        assert_eq!(results.len(), 1);
3502        assert_eq!(results[0].1, "test-mem");
3503    }
3504
3505    #[test]
3506    fn scan_unbound_memories_excludes_bound_memories() {
3507        let conn = open_test_db();
3508        conn.execute(
3509            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3510            [],
3511        )
3512        .unwrap();
3513        let mem_id: i64 = conn
3514            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3515                r.get(0)
3516            })
3517            .unwrap();
3518        conn.execute(
3519            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3520            [],
3521        )
3522        .unwrap();
3523        let ent_id: i64 = conn
3524            .query_row(
3525                "SELECT id FROM entities WHERE name='some-entity'",
3526                [],
3527                |r| r.get(0),
3528            )
3529            .unwrap();
3530        conn.execute(
3531            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3532            rusqlite::params![mem_id, ent_id],
3533        )
3534        .unwrap();
3535
3536        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3537        assert!(results.is_empty(), "bound memory must not appear in scan");
3538    }
3539
3540    #[test]
3541    fn scan_entities_without_description_finds_null_description() {
3542        let conn = open_test_db();
3543        conn.execute(
3544            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3545            [],
3546        )
3547        .unwrap();
3548
3549        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3550        assert_eq!(results.len(), 1);
3551        assert_eq!(results[0].1, "my-tool");
3552    }
3553
3554    #[test]
3555    fn scan_entities_without_description_excludes_entities_with_description() {
3556        let conn = open_test_db();
3557        conn.execute(
3558            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3559            [],
3560        )
3561        .unwrap();
3562
3563        let results = scan_entities_without_description(&conn, "global", None).unwrap();
3564        assert!(
3565            results.is_empty(),
3566            "entity with description must not appear"
3567        );
3568    }
3569
3570    #[test]
3571    fn scan_short_body_memories_finds_short_bodies() {
3572        let conn = open_test_db();
3573        conn.execute(
3574            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3575            [],
3576        )
3577        .unwrap();
3578
3579        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3580        assert_eq!(results.len(), 1);
3581        assert_eq!(results[0].1, "short-mem");
3582    }
3583
3584    #[test]
3585    fn scan_short_body_memories_excludes_long_bodies() {
3586        let conn = open_test_db();
3587        let long_body = "a".repeat(1000);
3588        conn.execute(
3589            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3590            rusqlite::params![long_body],
3591        )
3592        .unwrap();
3593
3594        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3595        assert!(results.is_empty(), "long memory must not appear in scan");
3596    }
3597
3598    #[test]
3599    fn scan_respects_limit() {
3600        let conn = open_test_db();
3601        for i in 0..5 {
3602            conn.execute(
3603                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3604                [],
3605            )
3606            .unwrap();
3607        }
3608
3609        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3610        assert_eq!(results.len(), 3, "limit must be respected");
3611    }
3612
3613    #[test]
3614    fn queue_db_schema_creates_correctly() {
3615        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3616        let conn = open_queue_db(&tmp_path).expect("queue db must open");
3617        let count: i64 = conn
3618            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3619            .unwrap();
3620        assert_eq!(count, 0);
3621        let _ = std::fs::remove_file(&tmp_path);
3622    }
3623
3624    #[test]
3625    fn parse_claude_output_valid_bindings() {
3626        let output = r#"[
3627            {"type":"system","subtype":"init"},
3628            {"type":"result","is_error":false,"total_cost_usd":0.01,
3629             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3630        ]"#;
3631        let result = crate::commands::claude_runner::parse_claude_output(output)
3632            .expect("must parse successfully");
3633        assert!(result.value.get("entities").is_some());
3634        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3635        assert!(!result.is_oauth);
3636    }
3637
3638    #[test]
3639    fn parse_claude_output_detects_oauth() {
3640        let output = r#"[
3641            {"type":"system","subtype":"init","apiKeySource":"none"},
3642            {"type":"result","is_error":false,"total_cost_usd":0.0,
3643             "structured_output":{"entities":[],"relationships":[]}}
3644        ]"#;
3645        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3646        assert!(result.is_oauth);
3647    }
3648
3649    #[test]
3650    fn parse_claude_output_rate_limit_returns_error() {
3651        let output = r#"[
3652            {"type":"system","subtype":"init"},
3653            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3654        ]"#;
3655        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3656        assert!(matches!(err, AppError::RateLimited { .. }));
3657    }
3658
3659    #[test]
3660    fn parse_claude_output_auth_error() {
3661        let output = r#"[
3662            {"type":"system","subtype":"init"},
3663            {"type":"result","is_error":true,"error":"authentication failed"}
3664        ]"#;
3665        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3666        assert!(format!("{err}").contains("authentication failed"));
3667    }
3668
3669    #[test]
3670    fn dry_run_emits_preview_without_calling_llm() {
3671        // This test validates the dry-run NDJSON contract without spawning any process.
3672        // The scan_operation function requires a DB; we build one in-memory but cannot
3673        // call run() directly because it needs AppPaths (disk). Instead we test the
3674        // lower-level helpers that the dry-run path relies on.
3675        let conn = open_test_db();
3676        conn.execute(
3677            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3678            [],
3679        )
3680        .unwrap();
3681
3682        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
3683        assert_eq!(results.len(), 1);
3684        assert_eq!(results[0].1, "dry-mem");
3685        // If scan finds the item and dry_run is set, no LLM would be called.
3686        // The NDJSON emission is tested via integration tests with a fake binary.
3687    }
3688
3689    #[test]
3690    fn persist_entity_description_updates_db() {
3691        let conn = open_test_db();
3692        conn.execute(
3693            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
3694            [],
3695        )
3696        .unwrap();
3697        let eid: i64 = conn
3698            .query_row(
3699                "SELECT id FROM entities WHERE name='tokio-runtime'",
3700                [],
3701                |r| r.get(0),
3702            )
3703            .unwrap();
3704
3705        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
3706
3707        let desc: String = conn
3708            .query_row(
3709                "SELECT description FROM entities WHERE id=?1",
3710                rusqlite::params![eid],
3711                |r| r.get(0),
3712            )
3713            .unwrap();
3714        assert_eq!(desc, "Async runtime for Rust applications");
3715    }
3716
3717    #[test]
3718    fn bindings_schema_is_valid_json() {
3719        let _: serde_json::Value =
3720            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
3721    }
3722
3723    #[test]
3724    fn entity_description_schema_is_valid_json() {
3725        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
3726            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
3727    }
3728
3729    #[test]
3730    fn body_enrich_schema_is_valid_json() {
3731        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
3732            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
3733    }
3734}