Skip to main content

sqlite_graphrag/commands/
enrich.rs

1//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
2//!
3//! Enriches the knowledge graph by running LLM-powered analysis over memories
4//! and entities that are missing key structural data. Operations are:
5//!
6//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
7//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
8//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
9//! - all others: scan + structured NDJSON output (not-yet-implemented dispatch)
10//!
11//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
12//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
13// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
14//!
15//! # DRY opportunity
16//!
17//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
18//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
19//! here verbatim. A future refactoring could extract them into a shared
20//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
21//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
22//! this stream's boundary — flagged here for the Integration stream to evaluate.
23
24use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::process::{Command, Stdio};
38use std::time::Instant;
39
40// ---------------------------------------------------------------------------
41// Constants
42// ---------------------------------------------------------------------------
43
44const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
45const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
46const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
47const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
48
49// ---------------------------------------------------------------------------
50// JSON schema used for memory-bindings and body-enrich extraction
51// ---------------------------------------------------------------------------
52
53const BINDINGS_SCHEMA: &str = r#"{
54  "type": "object",
55  "properties": {
56    "entities": {
57      "type": "array",
58      "items": {
59        "type": "object",
60        "properties": {
61          "name": { "type": "string" },
62          "entity_type": {
63            "type": "string",
64            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
65          }
66        },
67        "required": ["name", "entity_type"],
68        "additionalProperties": false
69      }
70    },
71    "relationships": {
72      "type": "array",
73      "items": {
74        "type": "object",
75        "properties": {
76          "source": { "type": "string" },
77          "target": { "type": "string" },
78          "relation": {
79            "type": "string",
80            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
81          },
82          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
83        },
84        "required": ["source","target","relation","strength"],
85        "additionalProperties": false
86      }
87    }
88  },
89  "required": ["entities","relationships"],
90  "additionalProperties": false
91}"#;
92
93const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
94  "type": "object",
95  "properties": {
96    "description": { "type": "string" }
97  },
98  "required": ["description"],
99  "additionalProperties": false
100}"#;
101
102const BODY_ENRICH_SCHEMA: &str = r#"{
103  "type": "object",
104  "properties": {
105    "enriched_body": { "type": "string" }
106  },
107  "required": ["enriched_body"],
108  "additionalProperties": false
109}"#;
110
111// G27 P1: weight-calibrate
112const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
113Scale:\n\
114- 0.9 = vital hard dependency (A cannot function without B)\n\
115- 0.7 = important design relationship (A strongly supports/enables B)\n\
116- 0.5 = useful contextual link (A and B share relevant context)\n\
117- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
118Respond with the calibrated weight and brief reasoning.";
119
120const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
121  "type": "object",
122  "properties": {
123    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
124    "reasoning": { "type": "string" }
125  },
126  "required": ["calibrated_weight", "reasoning"],
127  "additionalProperties": false
128}"#;
129
130// G27 P1: relation-reclassify
131const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
132Valid canonical relations (pick exactly one):\n\
133- depends-on: A cannot function without B\n\
134- uses: A utilizes B but could substitute it\n\
135- supports: A reinforces or enables B\n\
136- causes: A triggers or produces B\n\
137- fixes: A resolves a problem in B\n\
138- contradicts: A conflicts with or invalidates B\n\
139- applies-to: A is relevant to or scoped within B\n\
140- follows: A comes after B in sequence\n\
141- replaces: A substitutes B\n\
142- tracked-in: A is monitored in B\n\
143- related: A and B share context (use sparingly)\n\n\
144Respond with the correct relation, strength, and reasoning.";
145
146const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
147  "type": "object",
148  "properties": {
149    "relation": { "type": "string" },
150    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
151    "reasoning": { "type": "string" }
152  },
153  "required": ["relation", "strength", "reasoning"],
154  "additionalProperties": false
155}"#;
156
157// G27 P2: entity-connect — suggest relationships between isolated entities
158const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
159Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
160If NO meaningful relationship exists, set relation to \"none\".\n\
161Respond with the relation (or \"none\"), strength, and reasoning.";
162
163const ENTITY_CONNECT_SCHEMA: &str = r#"{
164  "type": "object",
165  "properties": {
166    "relation": { "type": "string" },
167    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
168    "reasoning": { "type": "string" }
169  },
170  "required": ["relation", "strength", "reasoning"],
171  "additionalProperties": false
172}"#;
173
174// G27 P2: entity-type-validate — verify entity type assignments
175const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
176Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
177If the current type is correct, keep it. If wrong, suggest the correct type.\n\
178Respond with the validated type and reasoning.";
179
180const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
181  "type": "object",
182  "properties": {
183    "validated_type": { "type": "string" },
184    "was_correct": { "type": "boolean" },
185    "reasoning": { "type": "string" }
186  },
187  "required": ["validated_type", "was_correct", "reasoning"],
188  "additionalProperties": false
189}"#;
190
191// G27 P2: description-enrich — improve generic memory descriptions
192const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
193BAD: 'ingested from docs/auth.md'\n\
194GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
195Respond with the improved description and reasoning.";
196
197const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
198  "type": "object",
199  "properties": {
200    "description": { "type": "string" },
201    "reasoning": { "type": "string" }
202  },
203  "required": ["description", "reasoning"],
204  "additionalProperties": false
205}"#;
206
207// G27 P2: domain-classify — classify memory into domain category
208const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
209Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
210
211const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
212  "type": "object",
213  "properties": {
214    "domain": { "type": "string" },
215    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
216    "reasoning": { "type": "string" }
217  },
218  "required": ["domain", "confidence", "reasoning"],
219  "additionalProperties": false
220}"#;
221
222// G27 P2: graph-audit — audit graph for quality issues
223const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
224Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
225Respond with a list of issues found (or empty if none) and an overall quality score.";
226
227const GRAPH_AUDIT_SCHEMA: &str = r#"{
228  "type": "object",
229  "properties": {
230    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
231    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
232    "reasoning": { "type": "string" }
233  },
234  "required": ["quality_score", "issues", "reasoning"],
235  "additionalProperties": false
236}"#;
237
238// G27 P2: deep-research-synth — synthesize research findings into graph
239const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
240Entity names: lowercase kebab-case, domain-specific.\n\
241Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
242Respond with extracted entities, relationships, and a synthesis summary.";
243
244const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
245  "type": "object",
246  "properties": {
247    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
248    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
249    "summary": { "type": "string" }
250  },
251  "required": ["entities", "relationships", "summary"],
252  "additionalProperties": false
253}"#;
254
255// G27 P2: body-extract — extract structured content from unstructured text
256const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
257Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
258Respond with the restructured body and a brief summary of changes.";
259
260const BODY_EXTRACT_SCHEMA: &str = r#"{
261  "type": "object",
262  "properties": {
263    "restructured_body": { "type": "string" },
264    "changes_summary": { "type": "string" }
265  },
266  "required": ["restructured_body", "changes_summary"],
267  "additionalProperties": false
268}"#;
269
270// ---------------------------------------------------------------------------
271// Prompts
272// ---------------------------------------------------------------------------
273
274const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2751. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2762. Typed relationships between entities with strength scores\n\n\
277Rules:\n\
278- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
279- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
280- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
281- NEVER use 'mentions' as relationship type\n\
282- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
283- Prefer fewer high-quality entities over many low-quality ones";
284
285const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
286
287const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
288
289// ---------------------------------------------------------------------------
290// CLI args
291// ---------------------------------------------------------------------------
292
293/// Operation to perform in the `enrich` command.
294#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
295#[serde(rename_all = "kebab-case")]
296pub enum EnrichOperation {
297    /// Add missing entity/relationship bindings to memories (fully implemented).
298    MemoryBindings,
299    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
300    EntityDescriptions,
301    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
302    BodyEnrich,
303    /// Calibrate relationship weights using LLM analysis (scan only).
304    WeightCalibrate,
305    /// Reclassify relationship types using LLM judgment (scan only).
306    RelationReclassify,
307    /// Connect isolated entities by suggesting new relationships (scan only).
308    EntityConnect,
309    /// Validate entity type assignments using LLM judgment (scan only).
310    EntityTypeValidate,
311    /// Enrich memory descriptions that are generic/auto-generated (scan only).
312    DescriptionEnrich,
313    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
314    CrossDomainBridges,
315    /// Classify memories into domain categories (scan only).
316    DomainClassify,
317    /// Audit the graph for quality issues (scan only).
318    GraphAudit,
319    /// Synthesize deep-research findings into graph memories (scan only).
320    DeepResearchSynth,
321    /// Extract structured body from unstructured text (scan only).
322    BodyExtract,
323}
324
325/// LLM provider for enrichment.
326#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
327pub enum EnrichMode {
328    /// Use locally installed Claude Code CLI (OAuth-first).
329    ClaudeCode,
330    /// Use locally installed OpenAI Codex CLI.
331    Codex,
332}
333
334impl std::fmt::Display for EnrichMode {
335    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336        match self {
337            EnrichMode::ClaudeCode => write!(f, "claude-code"),
338            EnrichMode::Codex => write!(f, "codex"),
339        }
340    }
341}
342
343/// Arguments for the `enrich` subcommand.
344#[derive(clap::Args)]
345#[command(
346    about = "Enrich graph memories and entities using an LLM provider",
347    after_long_help = "EXAMPLES:\n  \
348    # Add missing entity bindings to all unbound memories\n  \
349    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
350    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
351    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
352    # Expand short memory bodies (GAP-18)\n  \
353    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
354    # Resume an interrupted body-enrich run\n  \
355    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
356    # Retry only failed items from a previous run\n  \
357    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
358    EXIT CODES:\n  \
359    0  success\n  \
360    1  validation error (bad args, binary not found)\n  \
361    14 I/O error"
362)]
363pub struct EnrichArgs {
364    /// Enrichment operation to run.
365    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
366    pub operation: EnrichOperation,
367
368    /// LLM provider to use. Default: claude-code (OAuth-first).
369    #[arg(long, value_enum, default_value = "claude-code")]
370    pub mode: EnrichMode,
371
372    /// Maximum number of items to process in this run. Omit for all.
373    #[arg(long, value_name = "N")]
374    pub limit: Option<usize>,
375
376    /// Preview items without calling the LLM (zero tokens consumed).
377    #[arg(long)]
378    pub dry_run: bool,
379
380    /// Namespace to operate on. Default: global.
381    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
382    pub namespace: Option<String>,
383
384    // -- Provider flags (Claude) --
385    /// Path to the Claude Code binary. Default: auto-detect from PATH.
386    #[arg(long, value_name = "PATH")]
387    pub claude_binary: Option<PathBuf>,
388
389    /// Claude model to use (e.g. claude-sonnet-4-6).
390    #[arg(long, value_name = "MODEL")]
391    pub claude_model: Option<String>,
392
393    /// Timeout per item in seconds when using Claude Code. Default: 300.
394    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
395    pub claude_timeout: u64,
396
397    // -- Provider flags (Codex) --
398    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
399    #[arg(long, value_name = "PATH")]
400    pub codex_binary: Option<PathBuf>,
401
402    /// Codex model to use (e.g. o4-mini).
403    #[arg(long, value_name = "MODEL")]
404    pub codex_model: Option<String>,
405
406    /// Timeout per item in seconds when using Codex. Default: 300.
407    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
408    pub codex_timeout: u64,
409
410    // -- Cost controls --
411    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
412    #[arg(long, value_name = "USD")]
413    pub max_cost_usd: Option<f64>,
414
415    // -- Queue controls --
416    /// Resume a previously interrupted run (skip already-done items).
417    #[arg(long)]
418    pub resume: bool,
419
420    /// Retry only items that failed in a previous run.
421    #[arg(long)]
422    pub retry_failed: bool,
423
424    // -- body-enrich specific flags (GAP-18) --
425    /// Minimum output character count for body-enrich. Default: 500.
426    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
427    pub min_output_chars: usize,
428
429    /// Maximum output character count for body-enrich. Default: 2000.
430    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
431    pub max_output_chars: usize,
432
433    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
434    #[arg(long, default_value_t = true)]
435    pub preserve_check: bool,
436
437    /// Path to a custom prompt template file for body-enrich.
438    #[arg(long, value_name = "PATH")]
439    pub prompt_template: Option<PathBuf>,
440
441    /// Number of parallel LLM workers (default 1 = serial).
442    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
443    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
444    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
445    pub llm_parallelism: u32,
446
447    // -- Output / infra --
448    /// Emit NDJSON output. Always true; flag accepted for compatibility.
449    #[arg(long)]
450    pub json: bool,
451
452    /// Database path override.
453    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
454    pub db: Option<String>,
455}
456
457// ---------------------------------------------------------------------------
458// Internal types — raw LLM output structs
459// ---------------------------------------------------------------------------
460
461// ---------------------------------------------------------------------------
462// NDJSON event types emitted to stdout
463// ---------------------------------------------------------------------------
464
465#[derive(Debug, Serialize)]
466struct PhaseEvent<'a> {
467    phase: &'a str,
468    #[serde(skip_serializing_if = "Option::is_none")]
469    binary_path: Option<&'a str>,
470    #[serde(skip_serializing_if = "Option::is_none")]
471    version: Option<&'a str>,
472    #[serde(skip_serializing_if = "Option::is_none")]
473    items_total: Option<usize>,
474    #[serde(skip_serializing_if = "Option::is_none")]
475    items_pending: Option<usize>,
476    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
477    #[serde(skip_serializing_if = "Option::is_none")]
478    llm_parallelism: Option<u32>,
479}
480
481#[derive(Debug, Serialize)]
482struct ItemEvent<'a> {
483    /// Item identifier (memory name or entity name).
484    item: &'a str,
485    status: &'a str,
486    #[serde(skip_serializing_if = "Option::is_none")]
487    memory_id: Option<i64>,
488    #[serde(skip_serializing_if = "Option::is_none")]
489    entity_id: Option<i64>,
490    #[serde(skip_serializing_if = "Option::is_none")]
491    entities: Option<usize>,
492    #[serde(skip_serializing_if = "Option::is_none")]
493    rels: Option<usize>,
494    #[serde(skip_serializing_if = "Option::is_none")]
495    chars_before: Option<usize>,
496    #[serde(skip_serializing_if = "Option::is_none")]
497    chars_after: Option<usize>,
498    #[serde(skip_serializing_if = "Option::is_none")]
499    cost_usd: Option<f64>,
500    #[serde(skip_serializing_if = "Option::is_none")]
501    elapsed_ms: Option<u64>,
502    #[serde(skip_serializing_if = "Option::is_none")]
503    error: Option<String>,
504    index: usize,
505    total: usize,
506}
507
508#[derive(Debug, Serialize)]
509struct EnrichSummary {
510    summary: bool,
511    operation: String,
512    items_total: usize,
513    completed: usize,
514    failed: usize,
515    skipped: usize,
516    cost_usd: f64,
517    elapsed_ms: u64,
518}
519
520use crate::output::emit_json_line as emit_json;
521
522// ---------------------------------------------------------------------------
523// Queue DB
524// ---------------------------------------------------------------------------
525
526/// Opens or creates the enrichment queue database.
527///
528/// The queue schema mirrors `ingest_claude` for resume/retry parity.
529/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
530///
531/// # DRY note
532///
533/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
534/// Both should be unified in a shared `llm_runner.rs` module by the
535/// Integration stream.
536fn open_queue_db(path: &str) -> Result<Connection, AppError> {
537    let conn = Connection::open(path)?;
538    conn.pragma_update(None, "journal_mode", "wal")?;
539    conn.execute_batch(
540        "CREATE TABLE IF NOT EXISTS queue (
541            id          INTEGER PRIMARY KEY AUTOINCREMENT,
542            item_key    TEXT NOT NULL UNIQUE,
543            item_type   TEXT NOT NULL DEFAULT 'memory',
544            status      TEXT NOT NULL DEFAULT 'pending',
545            memory_id   INTEGER,
546            entity_id   INTEGER,
547            entities    INTEGER DEFAULT 0,
548            rels        INTEGER DEFAULT 0,
549            error       TEXT,
550            cost_usd    REAL DEFAULT 0.0,
551            attempt     INTEGER DEFAULT 0,
552            elapsed_ms  INTEGER,
553            created_at  TEXT DEFAULT (datetime('now')),
554            done_at     TEXT
555        );
556        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
557    )?;
558    Ok(conn)
559}
560
561// ---------------------------------------------------------------------------
562// LLM invocation — Claude Code
563// ---------------------------------------------------------------------------
564
565/// Calls `claude -p` via the shared `claude_runner` module (G02).
566///
567/// Returns `(output_value, cost_usd, is_oauth)`.
568fn call_claude(
569    binary: &Path,
570    prompt: &str,
571    json_schema: &str,
572    input_text: &str,
573    model: Option<&str>,
574    timeout_secs: u64,
575) -> Result<(serde_json::Value, f64, bool), AppError> {
576    let result = crate::commands::claude_runner::run_claude(
577        binary,
578        prompt,
579        json_schema,
580        input_text,
581        model,
582        timeout_secs,
583        7,
584    )?;
585    Ok((result.value, result.cost_usd, result.is_oauth))
586}
587
588// ---------------------------------------------------------------------------
589// SCAN helpers — SQL queries that find items needing enrichment
590// ---------------------------------------------------------------------------
591
592/// Returns memories without any `memory_entities` binding.
593///
594/// These are the targets for `memory-bindings` enrichment.
595fn scan_unbound_memories(
596    conn: &Connection,
597    namespace: &str,
598    limit: Option<usize>,
599) -> Result<Vec<(i64, String, String)>, AppError> {
600    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
601    let sql = format!(
602        "SELECT m.id, m.name, m.body
603         FROM memories m
604         WHERE m.namespace = ?1
605           AND m.deleted_at IS NULL
606           AND NOT EXISTS (
607               SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
608           )
609         ORDER BY m.id
610         {limit_clause}"
611    );
612    let mut stmt = conn.prepare(&sql)?;
613    let rows = stmt
614        .query_map(rusqlite::params![namespace], |r| {
615            Ok((
616                r.get::<_, i64>(0)?,
617                r.get::<_, String>(1)?,
618                r.get::<_, String>(2)?,
619            ))
620        })?
621        .collect::<Result<Vec<_>, _>>()?;
622    Ok(rows)
623}
624
625/// Returns entities with NULL or empty description.
626///
627/// These are the targets for `entity-descriptions` enrichment.
628fn scan_entities_without_description(
629    conn: &Connection,
630    namespace: &str,
631    limit: Option<usize>,
632) -> Result<Vec<(i64, String, String)>, AppError> {
633    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
634    let sql = format!(
635        "SELECT id, name, type
636         FROM entities
637         WHERE namespace = ?1
638           AND (description IS NULL OR description = '')
639         ORDER BY id
640         {limit_clause}"
641    );
642    let mut stmt = conn.prepare(&sql)?;
643    let rows = stmt
644        .query_map(rusqlite::params![namespace], |r| {
645            Ok((
646                r.get::<_, i64>(0)?,
647                r.get::<_, String>(1)?,
648                r.get::<_, String>(2)?,
649            ))
650        })?
651        .collect::<Result<Vec<_>, _>>()?;
652    Ok(rows)
653}
654
655/// Returns memories whose body length is below the configured minimum.
656///
657/// These are the targets for `body-enrich` (GAP-18).
658fn scan_short_body_memories(
659    conn: &Connection,
660    namespace: &str,
661    min_chars: usize,
662    limit: Option<usize>,
663) -> Result<Vec<(i64, String, String)>, AppError> {
664    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
665    let sql = format!(
666        "SELECT m.id, m.name, m.body
667         FROM memories m
668         WHERE m.namespace = ?1
669           AND m.deleted_at IS NULL
670           AND LENGTH(COALESCE(m.body,'')) < ?2
671         ORDER BY m.id
672         {limit_clause}"
673    );
674    let mut stmt = conn.prepare(&sql)?;
675    let rows = stmt
676        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
677            Ok((
678                r.get::<_, i64>(0)?,
679                r.get::<_, String>(1)?,
680                r.get::<_, String>(2)?,
681            ))
682        })?
683        .collect::<Result<Vec<_>, _>>()?;
684    Ok(rows)
685}
686
687/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
688#[allow(clippy::type_complexity)]
689fn scan_weight_candidates(
690    conn: &Connection,
691    namespace: &str,
692    limit: Option<usize>,
693) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
694    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
695    let sql = format!(
696        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
697         FROM relationships r \
698         JOIN entities e1 ON e1.id = r.source_id \
699         JOIN entities e2 ON e2.id = r.target_id \
700         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
701         ORDER BY r.weight DESC {limit_clause}"
702    );
703    let mut stmt = conn.prepare(&sql)?;
704    let rows = stmt
705        .query_map(rusqlite::params![namespace], |r| {
706            Ok((
707                r.get::<_, i64>(0)?,
708                r.get::<_, String>(1)?,
709                r.get::<_, String>(2)?,
710                r.get::<_, String>(3)?,
711                r.get::<_, f64>(4)?,
712            ))
713        })?
714        .collect::<Result<Vec<_>, _>>()?;
715    Ok(rows)
716}
717
718/// G27: Returns relationships with generic relation types (applies_to).
719fn scan_generic_relations(
720    conn: &Connection,
721    namespace: &str,
722    limit: Option<usize>,
723) -> Result<Vec<(i64, String, String, String)>, AppError> {
724    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
725    let sql = format!(
726        "SELECT r.id, e1.name, e2.name, r.relation \
727         FROM relationships r \
728         JOIN entities e1 ON e1.id = r.source_id \
729         JOIN entities e2 ON e2.id = r.target_id \
730         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
731         ORDER BY r.id {limit_clause}"
732    );
733    let mut stmt = conn.prepare(&sql)?;
734    let rows = stmt
735        .query_map(rusqlite::params![namespace], |r| {
736            Ok((
737                r.get::<_, i64>(0)?,
738                r.get::<_, String>(1)?,
739                r.get::<_, String>(2)?,
740                r.get::<_, String>(3)?,
741            ))
742        })?
743        .collect::<Result<Vec<_>, _>>()?;
744    Ok(rows)
745}
746
747// ---------------------------------------------------------------------------
748// PERSIST helpers for fully-implemented operations
749// ---------------------------------------------------------------------------
750
751/// Persists entity bindings extracted by the LLM for a memory.
752///
753/// Creates entities via `upsert_entity`, links them to the memory via
754/// `link_memory_entity`, and upserts relationships found between entities.
755fn persist_memory_bindings(
756    conn: &Connection,
757    namespace: &str,
758    memory_id: i64,
759    entities_json: &serde_json::Value,
760    rels_json: &serde_json::Value,
761) -> Result<(usize, usize), AppError> {
762    #[derive(Deserialize)]
763    struct EntityItem {
764        name: String,
765        entity_type: String,
766    }
767    #[derive(Deserialize)]
768    struct RelItem {
769        source: String,
770        target: String,
771        relation: String,
772        strength: f64,
773    }
774
775    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
776        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
777
778    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
779        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
780
781    let mut ent_count = 0usize;
782    let mut rel_count = 0usize;
783
784    for item in &extracted_entities {
785        let entity_type = match item.entity_type.parse::<EntityType>() {
786            Ok(et) => et,
787            Err(_) => {
788                tracing::warn!(
789                    target: "enrich",
790                    entity = %item.name,
791                    entity_type = %item.entity_type,
792                    "entity type not recognized, skipping"
793                );
794                continue;
795            }
796        };
797        match entities::upsert_entity(
798            conn,
799            namespace,
800            &NewEntity {
801                name: item.name.clone(),
802                entity_type,
803                description: None,
804            },
805        ) {
806            Ok(eid) => {
807                let _ = entities::link_memory_entity(conn, memory_id, eid);
808                ent_count += 1;
809            }
810            Err(e) => {
811                tracing::warn!(
812                    target: "enrich",
813                    entity = %item.name,
814                    error = %e,
815                    "entity upsert skipped"
816                );
817            }
818        }
819    }
820
821    for rel in &extracted_rels {
822        let normalized = crate::parsers::normalize_relation(&rel.relation);
823        crate::parsers::warn_if_non_canonical(&normalized);
824
825        // Normalize entity names before lookup: upsert_entity normalizes on write,
826        // so the lookup must use the same normalized form to find the row.
827        let src_name = crate::parsers::normalize_entity_name(&rel.source);
828        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
829        let src_id = entities::find_entity_id(conn, namespace, &src_name);
830        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
831        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
832            let new_rel = NewRelationship {
833                source: rel.source.clone(),
834                target: rel.target.clone(),
835                relation: normalized,
836                strength: rel.strength,
837                description: None,
838            };
839            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
840                rel_count += 1;
841            }
842        }
843    }
844
845    Ok((ent_count, rel_count))
846}
847
848/// Updates an entity's description directly in the `entities` table.
849fn persist_entity_description(
850    conn: &Connection,
851    entity_id: i64,
852    description: &str,
853) -> Result<(), AppError> {
854    conn.execute(
855        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
856        rusqlite::params![description, entity_id],
857    )?;
858    Ok(())
859}
860
861/// Persists an enriched memory body (body-enrich, GAP-18).
862///
863/// Uses `memories::update` to set the new body and `sync_fts_after_update`
864/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
865fn persist_enriched_body(
866    conn: &Connection,
867    namespace: &str,
868    memory_id: i64,
869    memory_name: &str,
870    new_body: &str,
871    paths: &crate::paths::AppPaths,
872) -> Result<(), AppError> {
873    // Read current values for FTS sync
874    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
875        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
876        rusqlite::params![memory_id],
877        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
878    )?;
879
880    let memory_type: String = conn.query_row(
881        "SELECT type FROM memories WHERE id=?1",
882        rusqlite::params![memory_id],
883        |r| r.get(0),
884    )?;
885
886    let description: String = conn.query_row(
887        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
888        rusqlite::params![memory_id],
889        |r| r.get(0),
890    )?;
891
892    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
893
894    let new_memory = memories::NewMemory {
895        namespace: namespace.to_string(),
896        name: memory_name.to_string(),
897        memory_type: memory_type.clone(),
898        description: description.clone(),
899        body: new_body.to_string(),
900        body_hash,
901        session_id: None,
902        source: "enrich".to_string(),
903        metadata: serde_json::Value::Object(serde_json::Map::new()),
904    };
905
906    memories::update(conn, memory_id, &new_memory, None)?;
907    memories::sync_fts_after_update(
908        conn,
909        memory_id,
910        &old_name,
911        &old_desc,
912        &old_body,
913        &new_memory.name,
914        &new_memory.description,
915        &new_memory.body,
916    )?;
917
918    // Re-embed for recall accuracy
919    let snippet: String = new_body.chars().take(200).collect();
920    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
921    let chunks_info = crate::chunking::split_into_chunks_hierarchical(new_body, tokenizer);
922    let embedding_result = if chunks_info.len() <= 1 {
923        crate::daemon::embed_passage_or_local(&paths.models, new_body)
924    } else {
925        let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
926        let mut ok = true;
927        for chunk in &chunks_info {
928            let text = crate::chunking::chunk_text(new_body, chunk);
929            match crate::daemon::embed_passage_or_local(&paths.models, text) {
930                Ok(emb) => chunk_embeddings.push(emb),
931                Err(e) => {
932                    tracing::warn!(target: "enrich", error = %e, "chunk embedding failed");
933                    ok = false;
934                    break;
935                }
936            }
937        }
938        if ok {
939            Ok(crate::chunking::aggregate_embeddings(&chunk_embeddings))
940        } else {
941            crate::daemon::embed_passage_or_local(&paths.models, new_body)
942        }
943    };
944
945    if let Ok(embedding) = embedding_result {
946        if let Err(e) = memories::upsert_vec(
947            conn,
948            memory_id,
949            namespace,
950            &memory_type,
951            &embedding,
952            memory_name,
953            &snippet,
954        ) {
955            tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
956        }
957    }
958
959    Ok(())
960}
961
962// ---------------------------------------------------------------------------
963// Main entry point
964// ---------------------------------------------------------------------------
965
966/// Main entry point for the `enrich` command.
967pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
968    // TODO(G20): add mode-conditional flag validation before DB access.
969    // Flags that are silently discarded when the wrong mode is active:
970    //   --mode claude-code: codex_binary, codex_model, codex_timeout
971    //   --mode codex:       claude_binary, claude_model, claude_timeout,
972    //                       max_cost_usd, rate_limit_wait
973    // Approach: check each non-default flag value early and return
974    // Err(AppError::Validation(...)) for incompatible mode+flag combinations.
975    let started = Instant::now();
976
977    let paths = AppPaths::resolve(args.db.as_deref())?;
978    ensure_db_ready(&paths)?;
979    let conn = open_rw(&paths.db)?;
980    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
981
982    // G28-B (v1.0.68): enforce singleton per (job_type, namespace) so two
983    // parallel `enrich` invocations on the same DB cannot co-exist.  This is
984    // the root cause of the 2026-06-03 process-proliferation incident.
985    let _singleton =
986        crate::lock::acquire_job_singleton(crate::lock::JobType::Enrich, &namespace, None)?;
987
988    // Validate provider binary upfront
989    let provider_binary = match args.mode {
990        EnrichMode::ClaudeCode => {
991            let bin = find_claude_binary(args.claude_binary.as_deref())?;
992            let version = super::claude_runner::validate_claude_version(&bin)?;
993            tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
994            emit_json(&PhaseEvent {
995                phase: "validate",
996                binary_path: bin.to_str(),
997                version: Some(&version),
998                items_total: None,
999                items_pending: None,
1000                llm_parallelism: None,
1001            });
1002            bin
1003        }
1004        EnrichMode::Codex => {
1005            // Codex provider: locate binary using env or PATH
1006            let bin = find_codex_binary(args.codex_binary.as_deref())?;
1007            emit_json(&PhaseEvent {
1008                phase: "validate",
1009                binary_path: bin.to_str(),
1010                version: None,
1011                items_total: None,
1012                items_pending: None,
1013                llm_parallelism: None,
1014            });
1015            bin
1016        }
1017    };
1018
1019    // SCAN phase
1020    let scan_result = scan_operation(&conn, &namespace, args)?;
1021    let total = scan_result.len();
1022
1023    emit_json(&PhaseEvent {
1024        phase: "scan",
1025        binary_path: None,
1026        version: None,
1027        items_total: Some(total),
1028        items_pending: Some(total),
1029        llm_parallelism: Some(args.llm_parallelism),
1030    });
1031
1032    // Dry-run: emit preview events and summary without calling LLM
1033    if args.dry_run {
1034        for (idx, key) in scan_result.iter().enumerate() {
1035            emit_json(&ItemEvent {
1036                item: key,
1037                status: "preview",
1038                memory_id: None,
1039                entity_id: None,
1040                entities: None,
1041                rels: None,
1042                chars_before: None,
1043                chars_after: None,
1044                cost_usd: None,
1045                elapsed_ms: None,
1046                error: None,
1047                index: idx,
1048                total,
1049            });
1050        }
1051        emit_json(&EnrichSummary {
1052            summary: true,
1053            operation: format!("{:?}", args.operation),
1054            items_total: total,
1055            completed: 0,
1056            failed: 0,
1057            skipped: 0,
1058            cost_usd: 0.0,
1059            elapsed_ms: started.elapsed().as_millis() as u64,
1060        });
1061        return Ok(());
1062    }
1063
1064    // All 13 operations are now implemented (G27 complete).
1065
1066    // Queue setup for resume/retry
1067    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1068
1069    if args.resume {
1070        let reset = queue_conn
1071            .execute(
1072                "UPDATE queue SET status='pending' WHERE status='processing'",
1073                [],
1074            )
1075            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1076        if reset > 0 {
1077            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1078        }
1079    }
1080
1081    if args.retry_failed {
1082        let count = queue_conn
1083            .execute(
1084                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1085                [],
1086            )
1087            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1088        tracing::info!(target: "enrich", count, "retrying failed items");
1089    }
1090
1091    if !args.resume && !args.retry_failed {
1092        queue_conn
1093            .execute("DELETE FROM queue", [])
1094            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1095    }
1096
1097    // Populate queue
1098    for (idx, key) in scan_result.iter().enumerate() {
1099        let item_type = match args.operation {
1100            EnrichOperation::EntityDescriptions => "entity",
1101            _ => "memory",
1102        };
1103        if let Err(e) = queue_conn.execute(
1104            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1105            rusqlite::params![key, item_type],
1106        ) {
1107            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1108        }
1109        let _ = idx; // suppress unused warning
1110    }
1111
1112    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1113    // Clamp enforces the range even if the caller bypasses clap validation.
1114    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1115    if parallelism > 1 {
1116        tracing::info!(
1117            target: "enrich",
1118            llm_parallelism = parallelism,
1119            "parallel LLM processing with bounded thread pool"
1120        );
1121    }
1122    // G28-D (v1.0.68): warn above the recommended parallelism ceiling.  Each
1123    // worker spawns a `claude -p` subprocess that (without MCP isolation)
1124    // typically fan-outs 20+ child processes; 4 workers therefore risk ~80
1125    // extra processes.  See gaps.md G28 and the `external-process-audit-v1066`.
1126    if parallelism > 4 {
1127        tracing::warn!(
1128            target: "enrich",
1129            llm_parallelism = parallelism,
1130            recommended_max = 4,
1131            "llm_parallelism above 4 multiplies subprocess fan-out; \
1132             consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR to \
1133             cut MCP children (G28-A)"
1134        );
1135    }
1136
1137    let mut completed = 0usize;
1138    let mut failed = 0usize;
1139    let mut skipped = 0usize;
1140    let mut cost_total = 0.0f64;
1141    let mut oauth_detected = false;
1142    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1143    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1144    let enrich_started = std::time::Instant::now();
1145
1146    let provider_timeout = match args.mode {
1147        EnrichMode::ClaudeCode => args.claude_timeout,
1148        EnrichMode::Codex => args.codex_timeout,
1149    };
1150
1151    let provider_model: Option<&str> = match args.mode {
1152        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1153        EnrichMode::Codex => args.codex_model.as_deref(),
1154    };
1155
1156    // G19: when parallelism > 1, spawn bounded worker threads.
1157    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1158    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1159    if parallelism > 1 {
1160        let stdout_mu = parking_lot::Mutex::new(());
1161        let budget = args.max_cost_usd;
1162        let operation = args.operation.clone();
1163        let mode = args.mode.clone();
1164        let min_oc = args.min_output_chars;
1165        let max_oc = args.max_output_chars;
1166        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1167
1168        struct WorkerResult {
1169            completed: usize,
1170            failed: usize,
1171            skipped: usize,
1172            cost: f64,
1173            oauth: bool,
1174        }
1175
1176        let results: Vec<WorkerResult> = std::thread::scope(|s| {
1177            let handles: Vec<_> = (0..parallelism)
1178                .map(|worker_id| {
1179                    let stdout_mu = &stdout_mu;
1180                    let paths = &paths;
1181                    let namespace = &namespace;
1182                    let provider_binary = &provider_binary;
1183                    let operation = &operation;
1184                    let mode = &mode;
1185                    let prompt_tpl = prompt_tpl.as_deref();
1186                    s.spawn(move || {
1187                        let w_conn = match open_rw(&paths.db) {
1188                            Ok(c) => c,
1189                            Err(e) => {
1190                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1191                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1192                            }
1193                        };
1194                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1195                            Ok(c) => c,
1196                            Err(e) => {
1197                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1198                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1199                            }
1200                        };
1201                        let mut w_completed = 0usize;
1202                        let mut w_failed = 0usize;
1203                        let mut w_skipped = 0usize;
1204                        let mut w_cost = 0.0f64;
1205                        let mut w_oauth = false;
1206                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1207                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1208
1209                        loop {
1210                            if crate::shutdown_requested() {
1211                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1212                                break;
1213                            }
1214                            if let Some(b) = budget {
1215                                if !w_oauth && w_cost >= b {
1216                                    break;
1217                                }
1218                            }
1219                            let pending: Option<(i64, String, String)> = w_queue
1220                                .query_row(
1221                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1222                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1223                                     RETURNING id, item_key, item_type",
1224                                    [],
1225                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1226                                )
1227                                .ok();
1228                            let (queue_id, item_key, _item_type) = match pending {
1229                                Some(p) => p,
1230                                None => break,
1231                            };
1232                            let item_started = Instant::now();
1233                            let current_index = w_completed + w_failed + w_skipped;
1234
1235                            let call_result = match operation {
1236                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1237                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1238                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, paths),
1239                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1240                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1241                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1242                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1243                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1244                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1245                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1246                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1247                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1248                            };
1249
1250                            match call_result {
1251                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1252                                    if is_oauth { w_oauth = true; }
1253                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1254                                    let _ = w_queue.execute(
1255                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1256                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1257                                    );
1258                                    w_completed += 1;
1259                                    if !is_oauth { w_cost += cost; }
1260                                    let _guard = stdout_mu.lock();
1261                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1262                                }
1263                                Ok(EnrichItemResult::Skipped { reason }) => {
1264                                    w_skipped += 1;
1265                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1266                                    let _guard = stdout_mu.lock();
1267                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1268                                }
1269                                Err(e) => {
1270                                    let err_str = format!("{e}");
1271                                    if matches!(e, AppError::RateLimited { .. }) {
1272                                        if crate::retry::is_kill_switch_active() {
1273                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1274                                        } else if std::time::Instant::now() >= w_deadline {
1275                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1276                                        } else {
1277                                            let half = w_backoff / 2;
1278                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1279                                            let actual_wait = half + jitter;
1280                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1281                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1282                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1283                                            w_backoff = (w_backoff * 2).min(900);
1284                                            continue;
1285                                        }
1286                                    }
1287                                    w_failed += 1;
1288                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1289                                    let _guard = stdout_mu.lock();
1290                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1291                                }
1292                            }
1293                        }
1294                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1295                    })
1296                })
1297                .collect();
1298            handles
1299                .into_iter()
1300                .map(|h| {
1301                    h.join().unwrap_or(WorkerResult {
1302                        completed: 0,
1303                        failed: 0,
1304                        skipped: 0,
1305                        cost: 0.0,
1306                        oauth: false,
1307                    })
1308                })
1309                .collect()
1310        });
1311
1312        for r in &results {
1313            completed += r.completed;
1314            failed += r.failed;
1315            skipped += r.skipped;
1316            cost_total += r.cost;
1317            oauth_detected |= r.oauth;
1318        }
1319    } else {
1320        // Serial path (parallelism == 1) — original loop
1321        loop {
1322            if crate::shutdown_requested() {
1323                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1324                break;
1325            }
1326
1327            // Budget check
1328            if let Some(budget) = args.max_cost_usd {
1329                if !oauth_detected && cost_total >= budget {
1330                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1331                    break;
1332                }
1333            }
1334
1335            // Dequeue next pending item
1336            let pending: Option<(i64, String, String)> = queue_conn
1337                .query_row(
1338                    "UPDATE queue SET status='processing', attempt=attempt+1 \
1339                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1340                 RETURNING id, item_key, item_type",
1341                    [],
1342                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1343                )
1344                .ok();
1345
1346            let (queue_id, item_key, item_type) = match pending {
1347                Some(p) => p,
1348                None => break,
1349            };
1350
1351            let item_started = Instant::now();
1352            let current_index = completed + failed + skipped;
1353
1354            let call_result = match args.operation {
1355                EnrichOperation::MemoryBindings => call_memory_bindings(
1356                    &conn,
1357                    &namespace,
1358                    &item_key,
1359                    &provider_binary,
1360                    provider_model,
1361                    provider_timeout,
1362                    &args.mode,
1363                ),
1364                EnrichOperation::EntityDescriptions => call_entity_description(
1365                    &conn,
1366                    &namespace,
1367                    &item_key,
1368                    &provider_binary,
1369                    provider_model,
1370                    provider_timeout,
1371                    &args.mode,
1372                ),
1373                EnrichOperation::BodyEnrich => call_body_enrich(
1374                    &conn,
1375                    &namespace,
1376                    &item_key,
1377                    &provider_binary,
1378                    provider_model,
1379                    provider_timeout,
1380                    &args.mode,
1381                    args.min_output_chars,
1382                    args.max_output_chars,
1383                    args.prompt_template.as_deref(),
1384                    &paths,
1385                ),
1386                EnrichOperation::WeightCalibrate => call_weight_calibrate(
1387                    &conn,
1388                    &namespace,
1389                    &item_key,
1390                    &provider_binary,
1391                    provider_model,
1392                    provider_timeout,
1393                    &args.mode,
1394                ),
1395                EnrichOperation::RelationReclassify => call_relation_reclassify(
1396                    &conn,
1397                    &namespace,
1398                    &item_key,
1399                    &provider_binary,
1400                    provider_model,
1401                    provider_timeout,
1402                    &args.mode,
1403                ),
1404                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1405                    call_entity_connect(
1406                        &conn,
1407                        &namespace,
1408                        &item_key,
1409                        &provider_binary,
1410                        provider_model,
1411                        provider_timeout,
1412                        &args.mode,
1413                    )
1414                }
1415                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
1416                    &conn,
1417                    &namespace,
1418                    &item_key,
1419                    &provider_binary,
1420                    provider_model,
1421                    provider_timeout,
1422                    &args.mode,
1423                ),
1424                EnrichOperation::DescriptionEnrich => call_description_enrich(
1425                    &conn,
1426                    &namespace,
1427                    &item_key,
1428                    &provider_binary,
1429                    provider_model,
1430                    provider_timeout,
1431                    &args.mode,
1432                ),
1433                EnrichOperation::DomainClassify => call_domain_classify(
1434                    &conn,
1435                    &namespace,
1436                    &item_key,
1437                    &provider_binary,
1438                    provider_model,
1439                    provider_timeout,
1440                    &args.mode,
1441                ),
1442                EnrichOperation::GraphAudit => call_graph_audit(
1443                    &conn,
1444                    &namespace,
1445                    &item_key,
1446                    &provider_binary,
1447                    provider_model,
1448                    provider_timeout,
1449                    &args.mode,
1450                ),
1451                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
1452                    &conn,
1453                    &namespace,
1454                    &item_key,
1455                    &provider_binary,
1456                    provider_model,
1457                    provider_timeout,
1458                    &args.mode,
1459                ),
1460                EnrichOperation::BodyExtract => call_body_extract(
1461                    &conn,
1462                    &namespace,
1463                    &item_key,
1464                    &provider_binary,
1465                    provider_model,
1466                    provider_timeout,
1467                    &args.mode,
1468                ),
1469            };
1470
1471            match call_result {
1472                Ok(EnrichItemResult::Done {
1473                    memory_id,
1474                    entity_id,
1475                    entities,
1476                    rels,
1477                    chars_before,
1478                    chars_after,
1479                    cost,
1480                    is_oauth,
1481                }) => {
1482                    if is_oauth && !oauth_detected {
1483                        oauth_detected = true;
1484                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
1485                    }
1486                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1487
1488                    // Persist depends on the operation
1489                    let persist_err: Option<String> = match args.operation {
1490                        EnrichOperation::MemoryBindings => {
1491                            // Bindings already persisted inside call_memory_bindings
1492                            None
1493                        }
1494                        EnrichOperation::EntityDescriptions => {
1495                            // Description already persisted inside call_entity_description
1496                            None
1497                        }
1498                        EnrichOperation::BodyEnrich => {
1499                            // Body already persisted inside call_body_enrich
1500                            None
1501                        }
1502                        _ => {
1503                            // All G27 operations persist inside their call_* function
1504                            None
1505                        }
1506                    };
1507
1508                    if let Err(e) = queue_conn.execute(
1509                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1510                    rusqlite::params![
1511                        memory_id,
1512                        entity_id,
1513                        entities as i64,
1514                        rels as i64,
1515                        cost,
1516                        item_started.elapsed().as_millis() as i64,
1517                        queue_id
1518                    ],
1519                ) {
1520                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
1521                    }
1522
1523                    if persist_err.is_none() {
1524                        completed += 1;
1525                        if !is_oauth {
1526                            cost_total += cost;
1527                        }
1528                        emit_json(&ItemEvent {
1529                            item: &item_key,
1530                            status: "done",
1531                            memory_id,
1532                            entity_id,
1533                            entities: Some(entities),
1534                            rels: Some(rels),
1535                            chars_before,
1536                            chars_after,
1537                            cost_usd: if is_oauth { None } else { Some(cost) },
1538                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1539                            error: None,
1540                            index: current_index,
1541                            total,
1542                        });
1543                    } else {
1544                        failed += 1;
1545                        emit_json(&ItemEvent {
1546                            item: &item_key,
1547                            status: "failed",
1548                            memory_id: None,
1549                            entity_id: None,
1550                            entities: None,
1551                            rels: None,
1552                            chars_before: None,
1553                            chars_after: None,
1554                            cost_usd: None,
1555                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1556                            error: persist_err,
1557                            index: current_index,
1558                            total,
1559                        });
1560                    }
1561                }
1562                Ok(EnrichItemResult::Skipped { reason }) => {
1563                    skipped += 1;
1564                    if let Err(e) = queue_conn.execute(
1565                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1566                    rusqlite::params![reason, queue_id],
1567                ) {
1568                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
1569                    }
1570                    emit_json(&ItemEvent {
1571                        item: &item_key,
1572                        status: "skipped",
1573                        memory_id: None,
1574                        entity_id: None,
1575                        entities: None,
1576                        rels: None,
1577                        chars_before: None,
1578                        chars_after: None,
1579                        cost_usd: None,
1580                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1581                        error: None,
1582                        index: current_index,
1583                        total,
1584                    });
1585                }
1586                Err(e) => {
1587                    let err_str = format!("{e}");
1588                    if matches!(e, AppError::RateLimited { .. }) {
1589                        if crate::retry::is_kill_switch_active() {
1590                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1591                        } else if std::time::Instant::now() >= rate_limit_deadline {
1592                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
1593                        } else {
1594                            let half = backoff_secs / 2;
1595                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1596                            let actual_wait = half + jitter;
1597                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1598                            if let Err(qe) = queue_conn.execute(
1599                                "UPDATE queue SET status='pending' WHERE id=?1",
1600                                rusqlite::params![queue_id],
1601                            ) {
1602                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
1603                            }
1604                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1605                            backoff_secs = (backoff_secs * 2).min(900);
1606                            continue;
1607                        }
1608                    }
1609
1610                    failed += 1;
1611                    if let Err(qe) = queue_conn.execute(
1612                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1613                    rusqlite::params![err_str, queue_id],
1614                ) {
1615                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
1616                    }
1617                    emit_json(&ItemEvent {
1618                        item: &item_key,
1619                        status: "failed",
1620                        memory_id: None,
1621                        entity_id: None,
1622                        entities: None,
1623                        rels: None,
1624                        chars_before: None,
1625                        chars_after: None,
1626                        cost_usd: None,
1627                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1628                        error: Some(err_str),
1629                        index: current_index,
1630                        total,
1631                    });
1632                }
1633            }
1634
1635            let _ = item_type; // used via queue schema only
1636        }
1637    } // end else (serial path)
1638
1639    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1640    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1641
1642    emit_json(&EnrichSummary {
1643        summary: true,
1644        operation: format!("{:?}", args.operation),
1645        items_total: total,
1646        completed,
1647        failed,
1648        skipped,
1649        cost_usd: cost_total,
1650        elapsed_ms: started.elapsed().as_millis() as u64,
1651    });
1652
1653    if failed == 0 {
1654        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
1655    }
1656
1657    Ok(())
1658}
1659
1660// ---------------------------------------------------------------------------
1661// Internal result type for a single item call
1662// ---------------------------------------------------------------------------
1663
1664enum EnrichItemResult {
1665    Done {
1666        memory_id: Option<i64>,
1667        entity_id: Option<i64>,
1668        entities: usize,
1669        rels: usize,
1670        chars_before: Option<usize>,
1671        chars_after: Option<usize>,
1672        cost: f64,
1673        is_oauth: bool,
1674    },
1675    Skipped {
1676        reason: String,
1677    },
1678}
1679
1680// ---------------------------------------------------------------------------
1681// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
1682// ---------------------------------------------------------------------------
1683
1684fn call_memory_bindings(
1685    conn: &Connection,
1686    namespace: &str,
1687    memory_name: &str,
1688    binary: &Path,
1689    model: Option<&str>,
1690    timeout: u64,
1691    mode: &EnrichMode,
1692) -> Result<EnrichItemResult, AppError> {
1693    // Look up the memory
1694    let (memory_id, body): (i64, String) = conn.query_row(
1695        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1696        rusqlite::params![namespace, memory_name],
1697        |r| Ok((r.get(0)?, r.get(1)?)),
1698    ).map_err(|e| match e {
1699        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
1700        other => AppError::Database(other),
1701    })?;
1702
1703    if body.trim().is_empty() {
1704        return Ok(EnrichItemResult::Skipped {
1705            reason: "body is empty".to_string(),
1706        });
1707    }
1708
1709    let (value, cost, is_oauth) = match mode {
1710        EnrichMode::ClaudeCode => call_claude(
1711            binary,
1712            BINDINGS_PROMPT,
1713            BINDINGS_SCHEMA,
1714            &body,
1715            model,
1716            timeout,
1717        )?,
1718        EnrichMode::Codex => call_codex(
1719            binary,
1720            BINDINGS_PROMPT,
1721            BINDINGS_SCHEMA,
1722            &body,
1723            model,
1724            timeout,
1725        )?,
1726    };
1727
1728    let empty_arr = serde_json::Value::Array(vec![]);
1729    let entities_val = value.get("entities").unwrap_or(&empty_arr);
1730    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
1731
1732    let (ent_count, rel_count) =
1733        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
1734
1735    Ok(EnrichItemResult::Done {
1736        memory_id: Some(memory_id),
1737        entity_id: None,
1738        entities: ent_count,
1739        rels: rel_count,
1740        chars_before: None,
1741        chars_after: None,
1742        cost,
1743        is_oauth,
1744    })
1745}
1746
1747fn call_entity_description(
1748    conn: &Connection,
1749    namespace: &str,
1750    entity_name: &str,
1751    binary: &Path,
1752    model: Option<&str>,
1753    timeout: u64,
1754    mode: &EnrichMode,
1755) -> Result<EnrichItemResult, AppError> {
1756    let (entity_id, entity_type): (i64, String) = conn
1757        .query_row(
1758            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
1759            rusqlite::params![namespace, entity_name],
1760            |r| Ok((r.get(0)?, r.get(1)?)),
1761        )
1762        .map_err(|e| match e {
1763            rusqlite::Error::QueryReturnedNoRows => {
1764                AppError::NotFound(format!("entity '{entity_name}' not found"))
1765            }
1766            other => AppError::Database(other),
1767        })?;
1768
1769    let prompt = format!(
1770        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
1771    );
1772
1773    let (value, cost, is_oauth) = match mode {
1774        EnrichMode::ClaudeCode => call_claude(
1775            binary,
1776            &prompt,
1777            ENTITY_DESCRIPTION_SCHEMA,
1778            "",
1779            model,
1780            timeout,
1781        )?,
1782        EnrichMode::Codex => call_codex(
1783            binary,
1784            &prompt,
1785            ENTITY_DESCRIPTION_SCHEMA,
1786            "",
1787            model,
1788            timeout,
1789        )?,
1790    };
1791
1792    let description = value
1793        .get("description")
1794        .and_then(|v| v.as_str())
1795        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
1796
1797    persist_entity_description(conn, entity_id, description)?;
1798
1799    Ok(EnrichItemResult::Done {
1800        memory_id: None,
1801        entity_id: Some(entity_id),
1802        entities: 0,
1803        rels: 0,
1804        chars_before: None,
1805        chars_after: None,
1806        cost,
1807        is_oauth,
1808    })
1809}
1810
1811#[allow(clippy::too_many_arguments)]
1812fn call_body_enrich(
1813    conn: &Connection,
1814    namespace: &str,
1815    memory_name: &str,
1816    binary: &Path,
1817    model: Option<&str>,
1818    timeout: u64,
1819    mode: &EnrichMode,
1820    min_output_chars: usize,
1821    max_output_chars: usize,
1822    prompt_template: Option<&Path>,
1823    paths: &crate::paths::AppPaths,
1824) -> Result<EnrichItemResult, AppError> {
1825    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
1826        .query_row(
1827            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
1828         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1829            rusqlite::params![namespace, memory_name],
1830            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
1831        )
1832        .map_err(|e| match e {
1833            rusqlite::Error::QueryReturnedNoRows => {
1834                AppError::NotFound(format!("memory '{memory_name}' not found"))
1835            }
1836            other => AppError::Database(other),
1837        })?;
1838
1839    let chars_before = body.chars().count();
1840
1841    // G26: gather graph context for contextualized enrichment
1842    let linked_entities: Vec<String> = {
1843        let mut stmt = conn.prepare_cached(
1844            "SELECT e.name FROM memory_entities me \
1845             JOIN entities e ON e.id = me.entity_id \
1846             WHERE me.memory_id = ?1 LIMIT 10",
1847        )?;
1848        let result: Vec<String> = stmt
1849            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
1850            .filter_map(|r| r.ok())
1851            .collect();
1852        drop(stmt);
1853        result
1854    };
1855
1856    // Load custom prompt template if provided
1857    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
1858        let file_size = std::fs::metadata(tmpl_path)
1859            .map_err(|e| {
1860                AppError::Io(std::io::Error::new(
1861                    e.kind(),
1862                    format!("failed to stat prompt template: {e}"),
1863                ))
1864            })?
1865            .len();
1866        if file_size > MAX_MEMORY_BODY_LEN as u64 {
1867            return Err(AppError::LimitExceeded(
1868                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
1869            ));
1870        }
1871        std::fs::read_to_string(tmpl_path).map_err(|e| {
1872            AppError::Io(std::io::Error::new(
1873                e.kind(),
1874                format!("failed to read prompt template: {e}"),
1875            ))
1876        })?
1877    } else {
1878        BODY_ENRICH_PROMPT_PREFIX.to_string()
1879    };
1880
1881    // G26: build contextualized prompt with graph data
1882    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
1883        let mut ctx = String::new();
1884        ctx.push_str(&format!(
1885            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
1886        ));
1887        if !description.is_empty() {
1888            ctx.push_str(&format!("- Description: {description}\n"));
1889        }
1890        ctx.push_str(&format!("- Domain: {namespace}\n"));
1891        if !linked_entities.is_empty() {
1892            ctx.push_str(&format!(
1893                "- Linked entities: {}\n",
1894                linked_entities.join(", ")
1895            ));
1896        }
1897        ctx
1898    } else {
1899        String::new()
1900    };
1901
1902    let prompt = format!(
1903        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
1904    );
1905
1906    // The body schema uses a free-form enriched_body field
1907    let (value, cost, is_oauth) = match mode {
1908        EnrichMode::ClaudeCode => {
1909            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1910        }
1911        EnrichMode::Codex => {
1912            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1913        }
1914    };
1915
1916    let enriched_body = value
1917        .get("enriched_body")
1918        .and_then(|v| v.as_str())
1919        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
1920
1921    let chars_after = enriched_body.chars().count();
1922
1923    // Only persist if the enriched body is genuinely longer
1924    if chars_after <= chars_before {
1925        return Ok(EnrichItemResult::Skipped {
1926            reason: format!(
1927                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
1928            ),
1929        });
1930    }
1931
1932    persist_enriched_body(
1933        conn,
1934        namespace,
1935        memory_id,
1936        memory_name,
1937        enriched_body,
1938        paths,
1939    )?;
1940
1941    Ok(EnrichItemResult::Done {
1942        memory_id: Some(memory_id),
1943        entity_id: None,
1944        entities: 0,
1945        rels: 0,
1946        chars_before: Some(chars_before),
1947        chars_after: Some(chars_after),
1948        cost,
1949        is_oauth,
1950    })
1951}
1952
1953// ---------------------------------------------------------------------------
1954// Scan dispatcher — maps operation to scan query result (item keys)
1955// ---------------------------------------------------------------------------
1956
1957fn scan_operation(
1958    conn: &Connection,
1959    namespace: &str,
1960    args: &EnrichArgs,
1961) -> Result<Vec<String>, AppError> {
1962    match args.operation {
1963        EnrichOperation::MemoryBindings => {
1964            let rows = scan_unbound_memories(conn, namespace, args.limit)?;
1965            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1966        }
1967        EnrichOperation::EntityDescriptions => {
1968            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
1969            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1970        }
1971        EnrichOperation::BodyEnrich => {
1972            let rows =
1973                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
1974            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1975        }
1976        EnrichOperation::WeightCalibrate => {
1977            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
1978            Ok(rows
1979                .into_iter()
1980                .map(|(id, _, _, _, _)| id.to_string())
1981                .collect())
1982        }
1983        EnrichOperation::RelationReclassify => {
1984            let rows = scan_generic_relations(conn, namespace, args.limit)?;
1985            Ok(rows
1986                .into_iter()
1987                .map(|(id, _, _, _)| id.to_string())
1988                .collect())
1989        }
1990        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1991            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
1992            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
1993        }
1994        EnrichOperation::EntityTypeValidate => {
1995            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
1996            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1997        }
1998        EnrichOperation::DescriptionEnrich => {
1999            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2000            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2001        }
2002        EnrichOperation::DomainClassify
2003        | EnrichOperation::GraphAudit
2004        | EnrichOperation::DeepResearchSynth
2005        | EnrichOperation::BodyExtract => {
2006            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2007            let sql = format!(
2008                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2009            );
2010            let mut stmt = conn.prepare(&sql)?;
2011            let names = stmt
2012                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2013                .collect::<Result<Vec<_>, _>>()?;
2014            Ok(names)
2015        }
2016    }
2017}
2018
2019// ---------------------------------------------------------------------------
2020// Codex stub provider
2021// ---------------------------------------------------------------------------
2022
2023/// Locates the Codex CLI binary.
2024fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2025    if let Some(p) = explicit {
2026        if p.exists() {
2027            return Ok(p.to_path_buf());
2028        }
2029        return Err(AppError::Validation(format!(
2030            "Codex binary not found at explicit path: {}",
2031            p.display()
2032        )));
2033    }
2034
2035    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2036        let p = PathBuf::from(&env_path);
2037        if p.exists() {
2038            return Ok(p);
2039        }
2040    }
2041
2042    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2043    if let Some(path_var) = std::env::var_os("PATH") {
2044        for dir in std::env::split_paths(&path_var) {
2045            let candidate = dir.join(name);
2046            if candidate.exists() {
2047                return Ok(candidate);
2048            }
2049        }
2050    }
2051
2052    Err(AppError::Validation(
2053        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2054    ))
2055}
2056
2057/// G27: Calibrate weight of a single relationship via LLM.
2058fn call_weight_calibrate(
2059    conn: &Connection,
2060    _namespace: &str,
2061    item_key: &str,
2062    binary: &Path,
2063    model: Option<&str>,
2064    timeout: u64,
2065    mode: &EnrichMode,
2066) -> Result<EnrichItemResult, AppError> {
2067    let rel_id: i64 = item_key
2068        .parse()
2069        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2070    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2071        .query_row(
2072            "SELECT e1.name, e2.name, r.relation, r.weight \
2073             FROM relationships r \
2074             JOIN entities e1 ON e1.id = r.source_id \
2075             JOIN entities e2 ON e2.id = r.target_id \
2076             WHERE r.id = ?1",
2077            rusqlite::params![rel_id],
2078            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2079        )
2080        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2081
2082    let input_text = format!(
2083        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2084    );
2085    let (value, cost, is_oauth) = match mode {
2086        EnrichMode::ClaudeCode => call_claude(
2087            binary,
2088            WEIGHT_CALIBRATE_PROMPT,
2089            WEIGHT_CALIBRATE_SCHEMA,
2090            &input_text,
2091            model,
2092            timeout,
2093        )?,
2094        EnrichMode::Codex => call_codex(
2095            binary,
2096            WEIGHT_CALIBRATE_PROMPT,
2097            WEIGHT_CALIBRATE_SCHEMA,
2098            &input_text,
2099            model,
2100            timeout,
2101        )?,
2102    };
2103
2104    let calibrated = value
2105        .get("calibrated_weight")
2106        .and_then(|v| v.as_f64())
2107        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2108
2109    conn.execute(
2110        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2111        rusqlite::params![calibrated, rel_id],
2112    )?;
2113
2114    Ok(EnrichItemResult::Done {
2115        memory_id: None,
2116        entity_id: None,
2117        entities: 0,
2118        rels: 1,
2119        chars_before: None,
2120        chars_after: None,
2121        cost,
2122        is_oauth,
2123    })
2124}
2125
2126/// G27: Reclassify a generic relationship type via LLM.
2127fn call_relation_reclassify(
2128    conn: &Connection,
2129    _namespace: &str,
2130    item_key: &str,
2131    binary: &Path,
2132    model: Option<&str>,
2133    timeout: u64,
2134    mode: &EnrichMode,
2135) -> Result<EnrichItemResult, AppError> {
2136    let rel_id: i64 = item_key
2137        .parse()
2138        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2139    let (source_name, target_name, current_relation): (String, String, String) = conn
2140        .query_row(
2141            "SELECT e1.name, e2.name, r.relation \
2142             FROM relationships r \
2143             JOIN entities e1 ON e1.id = r.source_id \
2144             JOIN entities e2 ON e2.id = r.target_id \
2145             WHERE r.id = ?1",
2146            rusqlite::params![rel_id],
2147            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2148        )
2149        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2150
2151    let input_text = format!(
2152        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2153    );
2154    let (value, cost, is_oauth) = match mode {
2155        EnrichMode::ClaudeCode => call_claude(
2156            binary,
2157            RELATION_RECLASSIFY_PROMPT,
2158            RELATION_RECLASSIFY_SCHEMA,
2159            &input_text,
2160            model,
2161            timeout,
2162        )?,
2163        EnrichMode::Codex => call_codex(
2164            binary,
2165            RELATION_RECLASSIFY_PROMPT,
2166            RELATION_RECLASSIFY_SCHEMA,
2167            &input_text,
2168            model,
2169            timeout,
2170        )?,
2171    };
2172
2173    let new_relation = value
2174        .get("relation")
2175        .and_then(|v| v.as_str())
2176        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2177    let new_strength = value
2178        .get("strength")
2179        .and_then(|v| v.as_f64())
2180        .unwrap_or(0.5);
2181
2182    conn.execute(
2183        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2184        rusqlite::params![new_relation, new_strength, rel_id],
2185    )?;
2186
2187    Ok(EnrichItemResult::Done {
2188        memory_id: None,
2189        entity_id: None,
2190        entities: 0,
2191        rels: 1,
2192        chars_before: None,
2193        chars_after: None,
2194        cost,
2195        is_oauth,
2196    })
2197}
2198
2199/// G27 P2: Connect isolated entities via LLM-suggested relationship.
2200fn call_entity_connect(
2201    conn: &Connection,
2202    namespace: &str,
2203    item_key: &str,
2204    binary: &Path,
2205    model: Option<&str>,
2206    timeout: u64,
2207    mode: &EnrichMode,
2208) -> Result<EnrichItemResult, AppError> {
2209    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
2210    let (e1_id, e1_name, e2_id, e2_name) =
2211        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
2212            Some(p) => p,
2213            None => {
2214                return Ok(EnrichItemResult::Skipped {
2215                    reason: "pair no longer isolated".into(),
2216                })
2217            }
2218        };
2219    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
2220    let (value, cost, is_oauth) = match mode {
2221        EnrichMode::ClaudeCode => call_claude(
2222            binary,
2223            ENTITY_CONNECT_PROMPT,
2224            ENTITY_CONNECT_SCHEMA,
2225            &input_text,
2226            model,
2227            timeout,
2228        )?,
2229        EnrichMode::Codex => call_codex(
2230            binary,
2231            ENTITY_CONNECT_PROMPT,
2232            ENTITY_CONNECT_SCHEMA,
2233            &input_text,
2234            model,
2235            timeout,
2236        )?,
2237    };
2238    let relation = value
2239        .get("relation")
2240        .and_then(|v| v.as_str())
2241        .unwrap_or("none");
2242    if relation == "none" {
2243        return Ok(EnrichItemResult::Skipped {
2244            reason: "LLM determined no relationship".into(),
2245        });
2246    }
2247    let strength = value
2248        .get("strength")
2249        .and_then(|v| v.as_f64())
2250        .unwrap_or(0.5);
2251    conn.execute(
2252        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
2253        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
2254    )?;
2255    Ok(EnrichItemResult::Done {
2256        memory_id: None,
2257        entity_id: None,
2258        entities: 0,
2259        rels: 1,
2260        chars_before: None,
2261        chars_after: None,
2262        cost,
2263        is_oauth,
2264    })
2265}
2266
2267/// G27 P2: Validate entity type assignment via LLM.
2268fn call_entity_type_validate(
2269    conn: &Connection,
2270    _namespace: &str,
2271    item_key: &str,
2272    binary: &Path,
2273    model: Option<&str>,
2274    timeout: u64,
2275    mode: &EnrichMode,
2276) -> Result<EnrichItemResult, AppError> {
2277    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
2278        .query_row(
2279            "SELECT id, name, type FROM entities WHERE name = ?1",
2280            rusqlite::params![item_key],
2281            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2282        )
2283        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
2284    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
2285    let (value, cost, is_oauth) = match mode {
2286        EnrichMode::ClaudeCode => call_claude(
2287            binary,
2288            ENTITY_TYPE_VALIDATE_PROMPT,
2289            ENTITY_TYPE_VALIDATE_SCHEMA,
2290            &input_text,
2291            model,
2292            timeout,
2293        )?,
2294        EnrichMode::Codex => call_codex(
2295            binary,
2296            ENTITY_TYPE_VALIDATE_PROMPT,
2297            ENTITY_TYPE_VALIDATE_SCHEMA,
2298            &input_text,
2299            model,
2300            timeout,
2301        )?,
2302    };
2303    let validated_type = value
2304        .get("validated_type")
2305        .and_then(|v| v.as_str())
2306        .unwrap_or(&ent_type);
2307    let was_correct = value
2308        .get("was_correct")
2309        .and_then(|v| v.as_bool())
2310        .unwrap_or(true);
2311    if !was_correct {
2312        conn.execute(
2313            "UPDATE entities SET type = ?1 WHERE id = ?2",
2314            rusqlite::params![validated_type, ent_id],
2315        )?;
2316    }
2317    Ok(EnrichItemResult::Done {
2318        memory_id: None,
2319        entity_id: Some(ent_id),
2320        entities: 1,
2321        rels: 0,
2322        chars_before: None,
2323        chars_after: None,
2324        cost,
2325        is_oauth,
2326    })
2327}
2328
2329/// G27 P2: Enrich generic memory description via LLM.
2330fn call_description_enrich(
2331    conn: &Connection,
2332    _namespace: &str,
2333    item_key: &str,
2334    binary: &Path,
2335    model: Option<&str>,
2336    timeout: u64,
2337    mode: &EnrichMode,
2338) -> Result<EnrichItemResult, AppError> {
2339    let (mem_id, body, old_desc): (i64, String, String) = conn
2340        .query_row(
2341            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2342            rusqlite::params![item_key],
2343            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2344        )
2345        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2346    let snippet: String = body.chars().take(500).collect();
2347    let input_text = format!(
2348        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
2349    );
2350    let (value, cost, is_oauth) = match mode {
2351        EnrichMode::ClaudeCode => call_claude(
2352            binary,
2353            DESCRIPTION_ENRICH_PROMPT,
2354            DESCRIPTION_ENRICH_SCHEMA,
2355            &input_text,
2356            model,
2357            timeout,
2358        )?,
2359        EnrichMode::Codex => call_codex(
2360            binary,
2361            DESCRIPTION_ENRICH_PROMPT,
2362            DESCRIPTION_ENRICH_SCHEMA,
2363            &input_text,
2364            model,
2365            timeout,
2366        )?,
2367    };
2368    let new_desc = value
2369        .get("description")
2370        .and_then(|v| v.as_str())
2371        .unwrap_or(&old_desc);
2372    conn.execute(
2373        "UPDATE memories SET description = ?1 WHERE id = ?2",
2374        rusqlite::params![new_desc, mem_id],
2375    )?;
2376    Ok(EnrichItemResult::Done {
2377        memory_id: Some(mem_id),
2378        entity_id: None,
2379        entities: 0,
2380        rels: 0,
2381        chars_before: Some(old_desc.len()),
2382        chars_after: Some(new_desc.len()),
2383        cost,
2384        is_oauth,
2385    })
2386}
2387
2388/// G27 P2: Classify memory into domain category via LLM.
2389fn call_domain_classify(
2390    conn: &Connection,
2391    _namespace: &str,
2392    item_key: &str,
2393    binary: &Path,
2394    model: Option<&str>,
2395    timeout: u64,
2396    mode: &EnrichMode,
2397) -> Result<EnrichItemResult, AppError> {
2398    let (mem_id, body, desc): (i64, String, String) = conn
2399        .query_row(
2400            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2401            rusqlite::params![item_key],
2402            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2403        )
2404        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2405    let snippet: String = body.chars().take(500).collect();
2406    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
2407    let (value, cost, is_oauth) = match mode {
2408        EnrichMode::ClaudeCode => call_claude(
2409            binary,
2410            DOMAIN_CLASSIFY_PROMPT,
2411            DOMAIN_CLASSIFY_SCHEMA,
2412            &input_text,
2413            model,
2414            timeout,
2415        )?,
2416        EnrichMode::Codex => call_codex(
2417            binary,
2418            DOMAIN_CLASSIFY_PROMPT,
2419            DOMAIN_CLASSIFY_SCHEMA,
2420            &input_text,
2421            model,
2422            timeout,
2423        )?,
2424    };
2425    let domain = value
2426        .get("domain")
2427        .and_then(|v| v.as_str())
2428        .unwrap_or("uncategorized");
2429    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
2430    conn.execute(
2431        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
2432        rusqlite::params![metadata, mem_id],
2433    )?;
2434    Ok(EnrichItemResult::Done {
2435        memory_id: Some(mem_id),
2436        entity_id: None,
2437        entities: 0,
2438        rels: 0,
2439        chars_before: None,
2440        chars_after: None,
2441        cost,
2442        is_oauth,
2443    })
2444}
2445
2446/// G27 P2: Audit memory graph quality via LLM.
2447fn call_graph_audit(
2448    conn: &Connection,
2449    _namespace: &str,
2450    item_key: &str,
2451    binary: &Path,
2452    model: Option<&str>,
2453    timeout: u64,
2454    mode: &EnrichMode,
2455) -> Result<EnrichItemResult, AppError> {
2456    let (mem_id, body, desc): (i64, String, String) = conn
2457        .query_row(
2458            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2459            rusqlite::params![item_key],
2460            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2461        )
2462        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2463    let snippet: String = body.chars().take(500).collect();
2464    let ent_count: i64 = conn
2465        .query_row(
2466            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
2467            rusqlite::params![mem_id],
2468            |r| r.get(0),
2469        )
2470        .unwrap_or(0);
2471    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
2472    let (value, cost, is_oauth) = match mode {
2473        EnrichMode::ClaudeCode => call_claude(
2474            binary,
2475            GRAPH_AUDIT_PROMPT,
2476            GRAPH_AUDIT_SCHEMA,
2477            &input_text,
2478            model,
2479            timeout,
2480        )?,
2481        EnrichMode::Codex => call_codex(
2482            binary,
2483            GRAPH_AUDIT_PROMPT,
2484            GRAPH_AUDIT_SCHEMA,
2485            &input_text,
2486            model,
2487            timeout,
2488        )?,
2489    };
2490    let issues = value
2491        .get("issues")
2492        .and_then(|v| v.as_array())
2493        .map(|a| a.len())
2494        .unwrap_or(0);
2495    Ok(EnrichItemResult::Done {
2496        memory_id: Some(mem_id),
2497        entity_id: None,
2498        entities: 0,
2499        rels: issues,
2500        chars_before: None,
2501        chars_after: None,
2502        cost,
2503        is_oauth,
2504    })
2505}
2506
2507/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
2508fn call_deep_research_synth(
2509    conn: &Connection,
2510    namespace: &str,
2511    item_key: &str,
2512    binary: &Path,
2513    model: Option<&str>,
2514    timeout: u64,
2515    mode: &EnrichMode,
2516) -> Result<EnrichItemResult, AppError> {
2517    let (mem_id, body): (i64, String) = conn
2518        .query_row(
2519            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2520            rusqlite::params![item_key],
2521            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
2522        )
2523        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2524    let snippet: String = body.chars().take(2000).collect();
2525    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
2526    let (value, cost, is_oauth) = match mode {
2527        EnrichMode::ClaudeCode => call_claude(
2528            binary,
2529            DEEP_RESEARCH_SYNTH_PROMPT,
2530            DEEP_RESEARCH_SYNTH_SCHEMA,
2531            &input_text,
2532            model,
2533            timeout,
2534        )?,
2535        EnrichMode::Codex => call_codex(
2536            binary,
2537            DEEP_RESEARCH_SYNTH_PROMPT,
2538            DEEP_RESEARCH_SYNTH_SCHEMA,
2539            &input_text,
2540            model,
2541            timeout,
2542        )?,
2543    };
2544    let mut ent_count = 0usize;
2545    let mut rel_count = 0usize;
2546    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
2547        for e in ents {
2548            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
2549            let etype_str = e
2550                .get("entity_type")
2551                .and_then(|v| v.as_str())
2552                .unwrap_or("concept");
2553            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
2554            if name.len() >= 2 {
2555                let ne = NewEntity {
2556                    name: name.to_string(),
2557                    entity_type: etype,
2558                    description: None,
2559                };
2560                let _ = entities::upsert_entity(conn, namespace, &ne);
2561                ent_count += 1;
2562            }
2563        }
2564    }
2565    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
2566        for r in rels {
2567            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
2568            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
2569            if src.is_empty() || tgt.is_empty() {
2570                continue;
2571            }
2572            let rel = r
2573                .get("relation")
2574                .and_then(|v| v.as_str())
2575                .unwrap_or("related");
2576            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
2577            if let (Some(sid), Some(tid)) = (
2578                entities::find_entity_id(conn, namespace, src)?,
2579                entities::find_entity_id(conn, namespace, tgt)?,
2580            ) {
2581                let _ = entities::create_or_fetch_relationship(
2582                    conn, namespace, sid, tid, rel, str_, None,
2583                );
2584                rel_count += 1;
2585            }
2586        }
2587    }
2588    Ok(EnrichItemResult::Done {
2589        memory_id: Some(mem_id),
2590        entity_id: None,
2591        entities: ent_count,
2592        rels: rel_count,
2593        chars_before: None,
2594        chars_after: None,
2595        cost,
2596        is_oauth,
2597    })
2598}
2599
2600/// G27 P2: Extract structured body from unstructured text via LLM.
2601fn call_body_extract(
2602    conn: &Connection,
2603    _namespace: &str,
2604    item_key: &str,
2605    binary: &Path,
2606    model: Option<&str>,
2607    timeout: u64,
2608    mode: &EnrichMode,
2609) -> Result<EnrichItemResult, AppError> {
2610    let (mem_id, body): (i64, String) = conn
2611        .query_row(
2612            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2613            rusqlite::params![item_key],
2614            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
2615        )
2616        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2617    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
2618    let (value, cost, is_oauth) = match mode {
2619        EnrichMode::ClaudeCode => call_claude(
2620            binary,
2621            BODY_EXTRACT_PROMPT,
2622            BODY_EXTRACT_SCHEMA,
2623            &input_text,
2624            model,
2625            timeout,
2626        )?,
2627        EnrichMode::Codex => call_codex(
2628            binary,
2629            BODY_EXTRACT_PROMPT,
2630            BODY_EXTRACT_SCHEMA,
2631            &input_text,
2632            model,
2633            timeout,
2634        )?,
2635    };
2636    let restructured = value
2637        .get("restructured_body")
2638        .and_then(|v| v.as_str())
2639        .unwrap_or(&body);
2640    let chars_before = body.len();
2641    let chars_after = restructured.len();
2642    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
2643    conn.execute(
2644        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
2645        rusqlite::params![restructured, new_hash, mem_id],
2646    )?;
2647    Ok(EnrichItemResult::Done {
2648        memory_id: Some(mem_id),
2649        entity_id: None,
2650        entities: 0,
2651        rels: 0,
2652        chars_before: Some(chars_before),
2653        chars_after: Some(chars_after),
2654        cost,
2655        is_oauth,
2656    })
2657}
2658
2659/// Scan for pairs of entities that share no direct relationship.
2660#[allow(clippy::type_complexity)]
2661fn scan_isolated_entity_pairs(
2662    conn: &Connection,
2663    namespace: &str,
2664    limit: Option<usize>,
2665) -> Result<Vec<(i64, String, i64, String)>, AppError> {
2666    let limit_val = limit.unwrap_or(50) as i64;
2667    let mut stmt = conn.prepare_cached(
2668        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
2669         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
2670         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
2671           (r.source_id = e1.id AND r.target_id = e2.id) OR \
2672           (r.source_id = e2.id AND r.target_id = e1.id)) \
2673         LIMIT ?2",
2674    )?;
2675    let rows = stmt
2676        .query_map(rusqlite::params![namespace, limit_val], |r| {
2677            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
2678        })?
2679        .collect::<Result<Vec<_>, _>>()?;
2680    Ok(rows)
2681}
2682
2683/// Scan for entities with non-validated types (all entities for type audit).
2684fn scan_entities_for_type_validation(
2685    conn: &Connection,
2686    namespace: &str,
2687    limit: Option<usize>,
2688) -> Result<Vec<(i64, String, String)>, AppError> {
2689    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2690    let sql = format!(
2691        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
2692    );
2693    let mut stmt = conn.prepare(&sql)?;
2694    let rows = stmt
2695        .query_map(rusqlite::params![namespace], |r| {
2696            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
2697        })?
2698        .collect::<Result<Vec<_>, _>>()?;
2699    Ok(rows)
2700}
2701
2702/// Scan for memories with generic descriptions (ingested, imported, etc).
2703fn scan_generic_descriptions(
2704    conn: &Connection,
2705    namespace: &str,
2706    limit: Option<usize>,
2707) -> Result<Vec<(i64, String, String)>, AppError> {
2708    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2709    let sql = format!(
2710        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
2711         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
2712         ORDER BY id {limit_clause}"
2713    );
2714    let mut stmt = conn.prepare(&sql)?;
2715    let rows = stmt
2716        .query_map(rusqlite::params![namespace], |r| {
2717            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
2718        })?
2719        .collect::<Result<Vec<_>, _>>()?;
2720    Ok(rows)
2721}
2722
2723/// Calls the Codex CLI for a single enrichment item.
2724///
2725/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
2726fn call_codex(
2727    binary: &Path,
2728    prompt: &str,
2729    json_schema: &str,
2730    input_text: &str,
2731    model: Option<&str>,
2732    timeout_secs: u64,
2733) -> Result<(serde_json::Value, f64, bool), AppError> {
2734    use wait_timeout::ChildExt;
2735
2736    let full_prompt = format!("{prompt}\n\n{input_text}");
2737    let schema_file = {
2738        let tmp = std::env::temp_dir().join(format!("enrich-schema-{}.json", std::process::id()));
2739        std::fs::write(&tmp, json_schema).map_err(AppError::Io)?;
2740        tmp
2741    };
2742
2743    let mut cmd = Command::new(binary);
2744    cmd.env_clear();
2745    for var in &[
2746        "PATH",
2747        "HOME",
2748        "USER",
2749        "OPENAI_API_KEY",
2750        "TMPDIR",
2751        "TMP",
2752        "TEMP",
2753    ] {
2754        if let Ok(val) = std::env::var(var) {
2755            cmd.env(var, val);
2756        }
2757    }
2758
2759    #[cfg(windows)]
2760    for var in &[
2761        "LOCALAPPDATA",
2762        "APPDATA",
2763        "USERPROFILE",
2764        "SystemRoot",
2765        "COMSPEC",
2766        "PATHEXT",
2767    ] {
2768        if let Ok(val) = std::env::var(var) {
2769            cmd.env(var, val);
2770        }
2771    }
2772
2773    cmd.arg("exec")
2774        .arg("--json")
2775        .arg("--output-schema")
2776        .arg(&schema_file);
2777
2778    if let Some(m) = model {
2779        cmd.arg("--model").arg(m);
2780    }
2781
2782    cmd.stdin(Stdio::piped())
2783        .stdout(Stdio::piped())
2784        .stderr(Stdio::piped());
2785
2786    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
2787        AppError::Io(std::io::Error::new(
2788            e.kind(),
2789            format!("failed to spawn codex: {e}"),
2790        ))
2791    })?;
2792
2793    let stdin_bytes = full_prompt.into_bytes();
2794    let mut child_stdin = child
2795        .stdin
2796        .take()
2797        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
2798    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
2799        child_stdin.write_all(&stdin_bytes)?;
2800        drop(child_stdin);
2801        Ok(())
2802    });
2803
2804    let start = std::time::Instant::now();
2805    let timeout = std::time::Duration::from_secs(timeout_secs);
2806    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
2807
2808    let _ = std::fs::remove_file(&schema_file);
2809
2810    match status {
2811        Some(exit_status) => {
2812            stdin_thread
2813                .join()
2814                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
2815                .map_err(AppError::Io)?;
2816
2817            tracing::debug!(
2818                target: "process",
2819                exit_code = ?exit_status.code(),
2820                elapsed_ms = start.elapsed().as_millis() as u64,
2821                "external process completed"
2822            );
2823
2824            let mut stdout_buf = Vec::new();
2825            if let Some(mut out) = child.stdout.take() {
2826                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
2827            }
2828            if !exit_status.success() {
2829                let mut stderr_buf = Vec::new();
2830                if let Some(mut err) = child.stderr.take() {
2831                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
2832                }
2833                let stderr_str = String::from_utf8_lossy(&stderr_buf);
2834                tracing::warn!(
2835                    target: "enrich",
2836                    exit_code = ?exit_status.code(),
2837                    stderr = %stderr_str.trim(),
2838                    "codex process failed"
2839                );
2840                return Err(AppError::Validation(format!(
2841                    "codex exited with code {:?}: {}",
2842                    exit_status.code(),
2843                    stderr_str.trim()
2844                )));
2845            }
2846            let stdout_str = String::from_utf8(stdout_buf)
2847                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
2848            let value: serde_json::Value = serde_json::from_str(&stdout_str).map_err(|e| {
2849                AppError::Validation(format!("failed to parse codex output as JSON: {e}"))
2850            })?;
2851            Ok((value, 0.0, false))
2852        }
2853        None => {
2854            let _ = child.kill();
2855            let _ = child.wait();
2856            let _ = stdin_thread.join();
2857            Err(AppError::Validation(format!(
2858                "codex timed out after {timeout_secs} seconds"
2859            )))
2860        }
2861    }
2862}
2863
2864// ---------------------------------------------------------------------------
2865// Tests
2866// ---------------------------------------------------------------------------
2867
2868#[cfg(test)]
2869mod tests {
2870    use super::*;
2871    use rusqlite::Connection;
2872
2873    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
2874    fn open_test_db() -> Connection {
2875        let conn = Connection::open_in_memory().expect("in-memory db");
2876        conn.execute_batch(
2877            "CREATE TABLE memories (
2878                id          INTEGER PRIMARY KEY AUTOINCREMENT,
2879                namespace   TEXT NOT NULL DEFAULT 'global',
2880                name        TEXT NOT NULL,
2881                type        TEXT NOT NULL DEFAULT 'note',
2882                description TEXT NOT NULL DEFAULT '',
2883                body        TEXT NOT NULL DEFAULT '',
2884                body_hash   TEXT NOT NULL DEFAULT '',
2885                session_id  TEXT,
2886                source      TEXT NOT NULL DEFAULT 'agent',
2887                metadata    TEXT NOT NULL DEFAULT '{}',
2888                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
2889                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
2890                deleted_at  INTEGER,
2891                UNIQUE(namespace, name)
2892            );
2893            CREATE TABLE entities (
2894                id          INTEGER PRIMARY KEY AUTOINCREMENT,
2895                namespace   TEXT NOT NULL DEFAULT 'global',
2896                name        TEXT NOT NULL,
2897                type        TEXT NOT NULL DEFAULT 'concept',
2898                description TEXT,
2899                degree      INTEGER NOT NULL DEFAULT 0,
2900                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
2901                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
2902                UNIQUE(namespace, name)
2903            );
2904            CREATE TABLE memory_entities (
2905                memory_id  INTEGER NOT NULL,
2906                entity_id  INTEGER NOT NULL,
2907                PRIMARY KEY (memory_id, entity_id)
2908            );
2909            CREATE TABLE relationships (
2910                id         INTEGER PRIMARY KEY AUTOINCREMENT,
2911                namespace  TEXT NOT NULL DEFAULT 'global',
2912                source_id  INTEGER NOT NULL,
2913                target_id  INTEGER NOT NULL,
2914                relation   TEXT NOT NULL,
2915                weight     REAL NOT NULL DEFAULT 0.5,
2916                description TEXT,
2917                UNIQUE(source_id, target_id, relation)
2918            );",
2919        )
2920        .expect("schema creation must succeed");
2921        conn
2922    }
2923
2924    #[test]
2925    fn scan_unbound_memories_finds_memories_without_bindings() {
2926        let conn = open_test_db();
2927        conn.execute(
2928            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
2929            [],
2930        )
2931        .unwrap();
2932
2933        let results = scan_unbound_memories(&conn, "global", None).unwrap();
2934        assert_eq!(results.len(), 1);
2935        assert_eq!(results[0].1, "test-mem");
2936    }
2937
2938    #[test]
2939    fn scan_unbound_memories_excludes_bound_memories() {
2940        let conn = open_test_db();
2941        conn.execute(
2942            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
2943            [],
2944        )
2945        .unwrap();
2946        let mem_id: i64 = conn
2947            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
2948                r.get(0)
2949            })
2950            .unwrap();
2951        conn.execute(
2952            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
2953            [],
2954        )
2955        .unwrap();
2956        let ent_id: i64 = conn
2957            .query_row(
2958                "SELECT id FROM entities WHERE name='some-entity'",
2959                [],
2960                |r| r.get(0),
2961            )
2962            .unwrap();
2963        conn.execute(
2964            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
2965            rusqlite::params![mem_id, ent_id],
2966        )
2967        .unwrap();
2968
2969        let results = scan_unbound_memories(&conn, "global", None).unwrap();
2970        assert!(results.is_empty(), "bound memory must not appear in scan");
2971    }
2972
2973    #[test]
2974    fn scan_entities_without_description_finds_null_description() {
2975        let conn = open_test_db();
2976        conn.execute(
2977            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
2978            [],
2979        )
2980        .unwrap();
2981
2982        let results = scan_entities_without_description(&conn, "global", None).unwrap();
2983        assert_eq!(results.len(), 1);
2984        assert_eq!(results[0].1, "my-tool");
2985    }
2986
2987    #[test]
2988    fn scan_entities_without_description_excludes_entities_with_description() {
2989        let conn = open_test_db();
2990        conn.execute(
2991            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
2992            [],
2993        )
2994        .unwrap();
2995
2996        let results = scan_entities_without_description(&conn, "global", None).unwrap();
2997        assert!(
2998            results.is_empty(),
2999            "entity with description must not appear"
3000        );
3001    }
3002
3003    #[test]
3004    fn scan_short_body_memories_finds_short_bodies() {
3005        let conn = open_test_db();
3006        conn.execute(
3007            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3008            [],
3009        )
3010        .unwrap();
3011
3012        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3013        assert_eq!(results.len(), 1);
3014        assert_eq!(results[0].1, "short-mem");
3015    }
3016
3017    #[test]
3018    fn scan_short_body_memories_excludes_long_bodies() {
3019        let conn = open_test_db();
3020        let long_body = "a".repeat(1000);
3021        conn.execute(
3022            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3023            rusqlite::params![long_body],
3024        )
3025        .unwrap();
3026
3027        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3028        assert!(results.is_empty(), "long memory must not appear in scan");
3029    }
3030
3031    #[test]
3032    fn scan_respects_limit() {
3033        let conn = open_test_db();
3034        for i in 0..5 {
3035            conn.execute(
3036                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3037                [],
3038            )
3039            .unwrap();
3040        }
3041
3042        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3043        assert_eq!(results.len(), 3, "limit must be respected");
3044    }
3045
3046    #[test]
3047    fn queue_db_schema_creates_correctly() {
3048        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3049        let conn = open_queue_db(&tmp_path).expect("queue db must open");
3050        let count: i64 = conn
3051            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3052            .unwrap();
3053        assert_eq!(count, 0);
3054        let _ = std::fs::remove_file(&tmp_path);
3055    }
3056
3057    #[test]
3058    fn parse_claude_output_valid_bindings() {
3059        let output = r#"[
3060            {"type":"system","subtype":"init"},
3061            {"type":"result","is_error":false,"total_cost_usd":0.01,
3062             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3063        ]"#;
3064        let result = crate::commands::claude_runner::parse_claude_output(output)
3065            .expect("must parse successfully");
3066        assert!(result.value.get("entities").is_some());
3067        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3068        assert!(!result.is_oauth);
3069    }
3070
3071    #[test]
3072    fn parse_claude_output_detects_oauth() {
3073        let output = r#"[
3074            {"type":"system","subtype":"init","apiKeySource":"none"},
3075            {"type":"result","is_error":false,"total_cost_usd":0.0,
3076             "structured_output":{"entities":[],"relationships":[]}}
3077        ]"#;
3078        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3079        assert!(result.is_oauth);
3080    }
3081
3082    #[test]
3083    fn parse_claude_output_rate_limit_returns_error() {
3084        let output = r#"[
3085            {"type":"system","subtype":"init"},
3086            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3087        ]"#;
3088        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3089        assert!(matches!(err, AppError::RateLimited { .. }));
3090    }
3091
3092    #[test]
3093    fn parse_claude_output_auth_error() {
3094        let output = r#"[
3095            {"type":"system","subtype":"init"},
3096            {"type":"result","is_error":true,"error":"authentication failed"}
3097        ]"#;
3098        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3099        assert!(format!("{err}").contains("authentication failed"));
3100    }
3101
3102    #[test]
3103    fn dry_run_emits_preview_without_calling_llm() {
3104        // This test validates the dry-run NDJSON contract without spawning any process.
3105        // The scan_operation function requires a DB; we build one in-memory but cannot
3106        // call run() directly because it needs AppPaths (disk). Instead we test the
3107        // lower-level helpers that the dry-run path relies on.
3108        let conn = open_test_db();
3109        conn.execute(
3110            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3111            [],
3112        )
3113        .unwrap();
3114
3115        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
3116        assert_eq!(results.len(), 1);
3117        assert_eq!(results[0].1, "dry-mem");
3118        // If scan finds the item and dry_run is set, no LLM would be called.
3119        // The NDJSON emission is tested via integration tests with a fake binary.
3120    }
3121
3122    #[test]
3123    fn persist_entity_description_updates_db() {
3124        let conn = open_test_db();
3125        conn.execute(
3126            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
3127            [],
3128        )
3129        .unwrap();
3130        let eid: i64 = conn
3131            .query_row(
3132                "SELECT id FROM entities WHERE name='tokio-runtime'",
3133                [],
3134                |r| r.get(0),
3135            )
3136            .unwrap();
3137
3138        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
3139
3140        let desc: String = conn
3141            .query_row(
3142                "SELECT description FROM entities WHERE id=?1",
3143                rusqlite::params![eid],
3144                |r| r.get(0),
3145            )
3146            .unwrap();
3147        assert_eq!(desc, "Async runtime for Rust applications");
3148    }
3149
3150    #[test]
3151    fn bindings_schema_is_valid_json() {
3152        let _: serde_json::Value =
3153            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
3154    }
3155
3156    #[test]
3157    fn entity_description_schema_is_valid_json() {
3158        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
3159            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
3160    }
3161
3162    #[test]
3163    fn body_enrich_schema_is_valid_json() {
3164        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
3165            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
3166    }
3167}