Skip to main content

sqlite_graphrag/commands/
enrich.rs

1//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
2//!
3//! Enriches the knowledge graph by running LLM-powered analysis over memories
4//! and entities that are missing key structural data. Operations are:
5//!
6//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
7//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
8//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
9//! - all others: scan + structured NDJSON output (not-yet-implemented dispatch)
10//!
11//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
12//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
13//!
14//! # DRY opportunity
15//!
16//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
17//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
18//! here verbatim. A future refactoring could extract them into a shared
19//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
20//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
21//! this stream's boundary — flagged here for the Integration stream to evaluate.
22
23use crate::commands::ingest_claude::find_claude_binary;
24use crate::entity_type::EntityType;
25use crate::errors::AppError;
26use crate::paths::AppPaths;
27use crate::storage::connection::{ensure_db_ready, open_rw};
28use crate::storage::entities::{self, NewEntity, NewRelationship};
29use crate::storage::memories;
30
31use rusqlite::Connection;
32use serde::{Deserialize, Serialize};
33use std::io::Write;
34use std::path::{Path, PathBuf};
35use std::process::{Command, Stdio};
36use std::time::Instant;
37
38// ---------------------------------------------------------------------------
39// Constants
40// ---------------------------------------------------------------------------
41
42const MIN_CLAUDE_VERSION: &str = "2.1.0";
43const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48// ---------------------------------------------------------------------------
49// JSON schema used for memory-bindings and body-enrich extraction
50// ---------------------------------------------------------------------------
51
52const BINDINGS_SCHEMA: &str = r#"{
53  "type": "object",
54  "properties": {
55    "entities": {
56      "type": "array",
57      "items": {
58        "type": "object",
59        "properties": {
60          "name": { "type": "string" },
61          "entity_type": {
62            "type": "string",
63            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64          }
65        },
66        "required": ["name", "entity_type"],
67        "additionalProperties": false
68      }
69    },
70    "relationships": {
71      "type": "array",
72      "items": {
73        "type": "object",
74        "properties": {
75          "source": { "type": "string" },
76          "target": { "type": "string" },
77          "relation": {
78            "type": "string",
79            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80          },
81          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82        },
83        "required": ["source","target","relation","strength"],
84        "additionalProperties": false
85      }
86    }
87  },
88  "required": ["entities","relationships"],
89  "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93  "type": "object",
94  "properties": {
95    "description": { "type": "string" }
96  },
97  "required": ["description"],
98  "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102  "type": "object",
103  "properties": {
104    "enriched_body": { "type": "string" }
105  },
106  "required": ["enriched_body"],
107  "additionalProperties": false
108}"#;
109
110// ---------------------------------------------------------------------------
111// Prompts
112// ---------------------------------------------------------------------------
113
114const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
1151. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
1162. Typed relationships between entities with strength scores\n\n\
117Rules:\n\
118- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
119- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
120- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
121- NEVER use 'mentions' as relationship type\n\
122- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
123- Prefer fewer high-quality entities over many low-quality ones";
124
125const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
126
127const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
128
129// ---------------------------------------------------------------------------
130// CLI args
131// ---------------------------------------------------------------------------
132
133/// Operation to perform in the `enrich` command.
134#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
135#[serde(rename_all = "kebab-case")]
136pub enum EnrichOperation {
137    /// Add missing entity/relationship bindings to memories (fully implemented).
138    MemoryBindings,
139    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
140    EntityDescriptions,
141    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
142    BodyEnrich,
143    /// Calibrate relationship weights using LLM analysis (scan only).
144    WeightCalibrate,
145    /// Reclassify relationship types using LLM judgment (scan only).
146    RelationReclassify,
147    /// Connect isolated entities by suggesting new relationships (scan only).
148    EntityConnect,
149    /// Validate entity type assignments using LLM judgment (scan only).
150    EntityTypeValidate,
151    /// Enrich memory descriptions that are generic/auto-generated (scan only).
152    DescriptionEnrich,
153    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
154    CrossDomainBridges,
155    /// Classify memories into domain categories (scan only).
156    DomainClassify,
157    /// Audit the graph for quality issues (scan only).
158    GraphAudit,
159    /// Synthesize deep-research findings into graph memories (scan only).
160    DeepResearchSynth,
161    /// Extract structured body from unstructured text (scan only).
162    BodyExtract,
163}
164
165/// LLM provider for enrichment.
166#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
167pub enum EnrichMode {
168    /// Use locally installed Claude Code CLI (OAuth-first).
169    ClaudeCode,
170    /// Use locally installed OpenAI Codex CLI.
171    Codex,
172}
173
174impl std::fmt::Display for EnrichMode {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        match self {
177            EnrichMode::ClaudeCode => write!(f, "claude-code"),
178            EnrichMode::Codex => write!(f, "codex"),
179        }
180    }
181}
182
183/// Arguments for the `enrich` subcommand.
184#[derive(clap::Args)]
185#[command(
186    about = "Enrich graph memories and entities using an LLM provider",
187    after_long_help = "EXAMPLES:\n  \
188    # Add missing entity bindings to all unbound memories\n  \
189    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
190    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
191    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
192    # Expand short memory bodies (GAP-18)\n  \
193    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
194    # Resume an interrupted body-enrich run\n  \
195    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
196    # Retry only failed items from a previous run\n  \
197    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
198    EXIT CODES:\n  \
199    0  success\n  \
200    1  validation error (bad args, binary not found)\n  \
201    14 I/O error"
202)]
203pub struct EnrichArgs {
204    /// Enrichment operation to run.
205    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
206    pub operation: EnrichOperation,
207
208    /// LLM provider to use. Default: claude-code (OAuth-first).
209    #[arg(long, value_enum, default_value = "claude-code")]
210    pub mode: EnrichMode,
211
212    /// Maximum number of items to process in this run. Omit for all.
213    #[arg(long, value_name = "N")]
214    pub limit: Option<usize>,
215
216    /// Preview items without calling the LLM (zero tokens consumed).
217    #[arg(long)]
218    pub dry_run: bool,
219
220    /// Namespace to operate on. Default: global.
221    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
222    pub namespace: Option<String>,
223
224    // -- Provider flags (Claude) --
225    /// Path to the Claude Code binary. Default: auto-detect from PATH.
226    #[arg(long, value_name = "PATH")]
227    pub claude_binary: Option<PathBuf>,
228
229    /// Claude model to use (e.g. claude-sonnet-4-6).
230    #[arg(long, value_name = "MODEL")]
231    pub claude_model: Option<String>,
232
233    /// Timeout per item in seconds when using Claude Code. Default: 300.
234    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
235    pub claude_timeout: u64,
236
237    // -- Provider flags (Codex) --
238    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
239    #[arg(long, value_name = "PATH")]
240    pub codex_binary: Option<PathBuf>,
241
242    /// Codex model to use (e.g. o4-mini).
243    #[arg(long, value_name = "MODEL")]
244    pub codex_model: Option<String>,
245
246    /// Timeout per item in seconds when using Codex. Default: 300.
247    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
248    pub codex_timeout: u64,
249
250    // -- Cost controls --
251    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
252    #[arg(long, value_name = "USD")]
253    pub max_cost_usd: Option<f64>,
254
255    // -- Queue controls --
256    /// Resume a previously interrupted run (skip already-done items).
257    #[arg(long)]
258    pub resume: bool,
259
260    /// Retry only items that failed in a previous run.
261    #[arg(long)]
262    pub retry_failed: bool,
263
264    // -- body-enrich specific flags (GAP-18) --
265    /// Minimum output character count for body-enrich. Default: 500.
266    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
267    pub min_output_chars: usize,
268
269    /// Maximum output character count for body-enrich. Default: 2000.
270    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
271    pub max_output_chars: usize,
272
273    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
274    #[arg(long, default_value_t = true)]
275    pub preserve_check: bool,
276
277    /// Path to a custom prompt template file for body-enrich.
278    #[arg(long, value_name = "PATH")]
279    pub prompt_template: Option<PathBuf>,
280
281    // -- Output / infra --
282    /// Emit NDJSON output. Always true; flag accepted for compatibility.
283    #[arg(long)]
284    pub json: bool,
285
286    /// Database path override.
287    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
288    pub db: Option<String>,
289}
290
291// ---------------------------------------------------------------------------
292// Internal types — raw LLM output structs
293// ---------------------------------------------------------------------------
294
295#[derive(Debug, Deserialize)]
296struct ClaudeElement {
297    r#type: Option<String>,
298    subtype: Option<String>,
299    #[serde(default)]
300    is_error: bool,
301    structured_output: Option<serde_json::Value>,
302    result: Option<String>,
303    total_cost_usd: Option<f64>,
304    error: Option<String>,
305    #[serde(rename = "apiKeySource")]
306    api_key_source: Option<String>,
307}
308
309// ---------------------------------------------------------------------------
310// NDJSON event types emitted to stdout
311// ---------------------------------------------------------------------------
312
313#[derive(Debug, Serialize)]
314struct PhaseEvent<'a> {
315    phase: &'a str,
316    #[serde(skip_serializing_if = "Option::is_none")]
317    binary_path: Option<&'a str>,
318    #[serde(skip_serializing_if = "Option::is_none")]
319    version: Option<&'a str>,
320    #[serde(skip_serializing_if = "Option::is_none")]
321    items_total: Option<usize>,
322    #[serde(skip_serializing_if = "Option::is_none")]
323    items_pending: Option<usize>,
324}
325
326#[derive(Debug, Serialize)]
327struct ItemEvent<'a> {
328    /// Item identifier (memory name or entity name).
329    item: &'a str,
330    status: &'a str,
331    #[serde(skip_serializing_if = "Option::is_none")]
332    memory_id: Option<i64>,
333    #[serde(skip_serializing_if = "Option::is_none")]
334    entity_id: Option<i64>,
335    #[serde(skip_serializing_if = "Option::is_none")]
336    entities: Option<usize>,
337    #[serde(skip_serializing_if = "Option::is_none")]
338    rels: Option<usize>,
339    #[serde(skip_serializing_if = "Option::is_none")]
340    chars_before: Option<usize>,
341    #[serde(skip_serializing_if = "Option::is_none")]
342    chars_after: Option<usize>,
343    #[serde(skip_serializing_if = "Option::is_none")]
344    cost_usd: Option<f64>,
345    #[serde(skip_serializing_if = "Option::is_none")]
346    elapsed_ms: Option<u64>,
347    #[serde(skip_serializing_if = "Option::is_none")]
348    error: Option<String>,
349    index: usize,
350    total: usize,
351}
352
353#[derive(Debug, Serialize)]
354struct EnrichSummary {
355    summary: bool,
356    operation: String,
357    items_total: usize,
358    completed: usize,
359    failed: usize,
360    skipped: usize,
361    cost_usd: f64,
362    elapsed_ms: u64,
363}
364
365// ---------------------------------------------------------------------------
366// Helper: emit a single JSON line to stdout
367// ---------------------------------------------------------------------------
368
369fn emit_json<T: Serialize>(value: &T) {
370    if let Ok(json) = serde_json::to_string(value) {
371        let stdout = std::io::stdout();
372        let mut lock = stdout.lock();
373        let _ = writeln!(lock, "{json}");
374        let _ = lock.flush();
375    }
376}
377
378// ---------------------------------------------------------------------------
379// Queue DB
380// ---------------------------------------------------------------------------
381
382/// Opens or creates the enrichment queue database.
383///
384/// The queue schema mirrors `ingest_claude` for resume/retry parity.
385/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
386///
387/// # DRY note
388///
389/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
390/// Both should be unified in a shared `llm_runner.rs` module by the
391/// Integration stream.
392fn open_queue_db(path: &str) -> Result<Connection, AppError> {
393    let conn = Connection::open(path)?;
394    conn.pragma_update(None, "journal_mode", "wal")?;
395    conn.execute_batch(
396        "CREATE TABLE IF NOT EXISTS queue (
397            id          INTEGER PRIMARY KEY AUTOINCREMENT,
398            item_key    TEXT NOT NULL UNIQUE,
399            item_type   TEXT NOT NULL DEFAULT 'memory',
400            status      TEXT NOT NULL DEFAULT 'pending',
401            memory_id   INTEGER,
402            entity_id   INTEGER,
403            entities    INTEGER DEFAULT 0,
404            rels        INTEGER DEFAULT 0,
405            error       TEXT,
406            cost_usd    REAL DEFAULT 0.0,
407            attempt     INTEGER DEFAULT 0,
408            elapsed_ms  INTEGER,
409            created_at  TEXT DEFAULT (datetime('now')),
410            done_at     TEXT
411        );
412        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
413    )?;
414    Ok(conn)
415}
416
417// ---------------------------------------------------------------------------
418// Validate Claude version (private copy — see DRY note above)
419// ---------------------------------------------------------------------------
420
421fn validate_claude_version_local(binary: &Path) -> Result<String, AppError> {
422    let output = Command::new(binary)
423        .arg("--version")
424        .stdin(Stdio::null())
425        .stdout(Stdio::piped())
426        .stderr(Stdio::piped())
427        .output()
428        .map_err(AppError::Io)?;
429
430    if !output.status.success() {
431        return Err(AppError::Validation(
432            "failed to run 'claude --version'".to_string(),
433        ));
434    }
435
436    let version_str = String::from_utf8(output.stdout)
437        .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
438    let version = version_str.trim().to_string();
439    let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
440
441    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
442        let parts: Vec<&str> = s.splitn(3, '.').collect();
443        if parts.len() < 2 {
444            return None;
445        }
446        let major = parts[0].parse::<u64>().ok()?;
447        let minor = parts[1].parse::<u64>().ok()?;
448        let patch = parts
449            .get(2)
450            .and_then(|p| p.parse::<u64>().ok())
451            .unwrap_or(0);
452        Some((major, minor, patch))
453    }
454
455    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
456        if actual < min {
457            return Err(AppError::Validation(format!(
458                "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
459            )));
460        }
461    }
462
463    Ok(version)
464}
465
466// ---------------------------------------------------------------------------
467// LLM invocation — Claude Code
468// ---------------------------------------------------------------------------
469
470/// Calls `claude -p` with a prompt and JSON schema, returning the parsed JSON value.
471///
472/// Returns `(output_value, cost_usd, is_oauth)`.
473///
474/// # DRY note
475///
476/// Mirrors `extract_with_claude` in `ingest_claude.rs`. Should be unified in a
477/// shared module by the Integration stream.
478fn call_claude(
479    binary: &Path,
480    prompt: &str,
481    json_schema: &str,
482    input_text: &str,
483    model: Option<&str>,
484    timeout_secs: u64,
485) -> Result<(serde_json::Value, f64, bool), AppError> {
486    use wait_timeout::ChildExt;
487
488    let full_prompt = format!("{prompt}\n\n{input_text}");
489
490    let mut cmd = Command::new(binary);
491
492    // Least-privilege environment
493    cmd.env_clear();
494    for var in &[
495        "PATH",
496        "HOME",
497        "USER",
498        "SHELL",
499        "TERM",
500        "LANG",
501        "XDG_CONFIG_HOME",
502        "XDG_DATA_HOME",
503        "XDG_RUNTIME_DIR",
504        "ANTHROPIC_API_KEY",
505        "CLAUDE_CONFIG_DIR",
506        "TMPDIR",
507        "TMP",
508        "TEMP",
509        "DYLD_FALLBACK_LIBRARY_PATH",
510    ] {
511        if let Ok(val) = std::env::var(var) {
512            cmd.env(var, val);
513        }
514    }
515
516    #[cfg(windows)]
517    for var in &[
518        "LOCALAPPDATA",
519        "APPDATA",
520        "USERPROFILE",
521        "SystemRoot",
522        "COMSPEC",
523        "PATHEXT",
524        "HOMEPATH",
525        "HOMEDRIVE",
526    ] {
527        if let Ok(val) = std::env::var(var) {
528            cmd.env(var, val);
529        }
530    }
531
532    cmd.arg("-p")
533        .arg(&full_prompt)
534        .arg("--output-format")
535        .arg("json")
536        .arg("--json-schema")
537        .arg(json_schema)
538        .arg("--max-turns")
539        .arg("3")
540        .arg("--no-session-persistence");
541
542    if std::env::var("ANTHROPIC_API_KEY").is_ok() {
543        cmd.arg("--bare");
544    } else {
545        cmd.arg("--dangerously-skip-permissions")
546            .arg("--settings")
547            .arg(r#"{"hooks":{}}"#);
548    }
549
550    if let Some(m) = model {
551        cmd.arg("--model").arg(m);
552    }
553
554    cmd.stdin(Stdio::null())
555        .stdout(Stdio::piped())
556        .stderr(Stdio::piped());
557
558    let mut child = cmd.spawn().map_err(|e| {
559        AppError::Io(std::io::Error::new(
560            e.kind(),
561            format!("failed to spawn claude: {e}"),
562        ))
563    })?;
564
565    let timeout = std::time::Duration::from_secs(timeout_secs);
566    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
567
568    match status {
569        Some(exit_status) => {
570            let mut stdout_buf = Vec::new();
571            let mut stderr_buf = Vec::new();
572            if let Some(mut out) = child.stdout.take() {
573                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
574            }
575            if let Some(mut err) = child.stderr.take() {
576                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
577            }
578
579            if !exit_status.success() {
580                let stderr_str = String::from_utf8_lossy(&stderr_buf);
581                if stderr_str.contains("auth") || stderr_str.contains("login") {
582                    tracing::warn!(
583                        target: "enrich",
584                        "Claude Code authentication may have failed. Re-authenticate with: claude"
585                    );
586                }
587                return Err(AppError::Validation(format!(
588                    "claude -p exited with code {:?}: {}",
589                    exit_status.code(),
590                    stderr_str.trim()
591                )));
592            }
593
594            let stdout_str = String::from_utf8(stdout_buf)
595                .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
596            parse_claude_json_output(&stdout_str)
597        }
598        None => {
599            tracing::warn!(target: "enrich", timeout_secs, "claude -p timed out, killing process");
600            let _ = child.kill();
601            let _ = child.wait();
602            Err(AppError::Validation(format!(
603                "claude -p timed out after {timeout_secs} seconds"
604            )))
605        }
606    }
607}
608
609/// Parses the JSON array output from `claude -p --output-format json`.
610///
611/// Returns `(structured_value, cost_usd, is_oauth)`.
612///
613/// # DRY note
614///
615/// Mirrors `parse_claude_output` in `ingest_claude.rs`. Should be unified.
616fn parse_claude_json_output(stdout: &str) -> Result<(serde_json::Value, f64, bool), AppError> {
617    let elements: Vec<ClaudeElement> = serde_json::from_str(stdout).map_err(|e| {
618        AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
619    })?;
620
621    let is_oauth = elements
622        .iter()
623        .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
624        .and_then(|e| e.api_key_source.as_deref())
625        .map(|s| s == "none")
626        .unwrap_or(false);
627
628    let result_elem = elements
629        .iter()
630        .find(|e| e.r#type.as_deref() == Some("result"))
631        .ok_or_else(|| {
632            AppError::Validation("claude output missing 'result' element".to_string())
633        })?;
634
635    if result_elem.is_error {
636        let err_msg = result_elem
637            .error
638            .as_deref()
639            .or(result_elem.result.as_deref())
640            .unwrap_or("unknown error");
641        if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
642            return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
643        }
644        return Err(AppError::Validation(format!(
645            "claude extraction failed: {err_msg}"
646        )));
647    }
648
649    let value = if let Some(v) = result_elem.structured_output.clone() {
650        v
651    } else if let Some(text) = &result_elem.result {
652        serde_json::from_str(text).map_err(|e| {
653            AppError::Validation(format!("failed to parse claude result field as JSON: {e}"))
654        })?
655    } else {
656        return Err(AppError::Validation(
657            "claude result missing structured_output and result field".into(),
658        ));
659    };
660
661    let cost = result_elem.total_cost_usd.unwrap_or(0.0);
662    Ok((value, cost, is_oauth))
663}
664
665// ---------------------------------------------------------------------------
666// SCAN helpers — SQL queries that find items needing enrichment
667// ---------------------------------------------------------------------------
668
669/// Returns memories without any `memory_entities` binding.
670///
671/// These are the targets for `memory-bindings` enrichment.
672fn scan_unbound_memories(
673    conn: &Connection,
674    namespace: &str,
675    limit: Option<usize>,
676) -> Result<Vec<(i64, String, String)>, AppError> {
677    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
678    let sql = format!(
679        "SELECT m.id, m.name, m.body
680         FROM memories m
681         WHERE m.namespace = ?1
682           AND m.deleted_at IS NULL
683           AND NOT EXISTS (
684               SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
685           )
686         ORDER BY m.id
687         {limit_clause}"
688    );
689    let mut stmt = conn.prepare(&sql)?;
690    let rows = stmt
691        .query_map(rusqlite::params![namespace], |r| {
692            Ok((
693                r.get::<_, i64>(0)?,
694                r.get::<_, String>(1)?,
695                r.get::<_, String>(2)?,
696            ))
697        })?
698        .collect::<Result<Vec<_>, _>>()?;
699    Ok(rows)
700}
701
702/// Returns entities with NULL or empty description.
703///
704/// These are the targets for `entity-descriptions` enrichment.
705fn scan_entities_without_description(
706    conn: &Connection,
707    namespace: &str,
708    limit: Option<usize>,
709) -> Result<Vec<(i64, String, String)>, AppError> {
710    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
711    let sql = format!(
712        "SELECT id, name, type
713         FROM entities
714         WHERE namespace = ?1
715           AND (description IS NULL OR description = '')
716         ORDER BY id
717         {limit_clause}"
718    );
719    let mut stmt = conn.prepare(&sql)?;
720    let rows = stmt
721        .query_map(rusqlite::params![namespace], |r| {
722            Ok((
723                r.get::<_, i64>(0)?,
724                r.get::<_, String>(1)?,
725                r.get::<_, String>(2)?,
726            ))
727        })?
728        .collect::<Result<Vec<_>, _>>()?;
729    Ok(rows)
730}
731
732/// Returns memories whose body length is below the configured minimum.
733///
734/// These are the targets for `body-enrich` (GAP-18).
735fn scan_short_body_memories(
736    conn: &Connection,
737    namespace: &str,
738    min_chars: usize,
739    limit: Option<usize>,
740) -> Result<Vec<(i64, String, String)>, AppError> {
741    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
742    let sql = format!(
743        "SELECT m.id, m.name, m.body
744         FROM memories m
745         WHERE m.namespace = ?1
746           AND m.deleted_at IS NULL
747           AND LENGTH(COALESCE(m.body,'')) < ?2
748         ORDER BY m.id
749         {limit_clause}"
750    );
751    let mut stmt = conn.prepare(&sql)?;
752    let rows = stmt
753        .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
754            Ok((
755                r.get::<_, i64>(0)?,
756                r.get::<_, String>(1)?,
757                r.get::<_, String>(2)?,
758            ))
759        })?
760        .collect::<Result<Vec<_>, _>>()?;
761    Ok(rows)
762}
763
764// ---------------------------------------------------------------------------
765// PERSIST helpers for fully-implemented operations
766// ---------------------------------------------------------------------------
767
768/// Persists entity bindings extracted by the LLM for a memory.
769///
770/// Creates entities via `upsert_entity`, links them to the memory via
771/// `link_memory_entity`, and upserts relationships found between entities.
772fn persist_memory_bindings(
773    conn: &Connection,
774    namespace: &str,
775    memory_id: i64,
776    entities_json: &serde_json::Value,
777    rels_json: &serde_json::Value,
778) -> Result<(usize, usize), AppError> {
779    #[derive(Deserialize)]
780    struct EntityItem {
781        name: String,
782        entity_type: String,
783    }
784    #[derive(Deserialize)]
785    struct RelItem {
786        source: String,
787        target: String,
788        relation: String,
789        strength: f64,
790    }
791
792    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
793        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
794
795    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
796        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
797
798    let mut ent_count = 0usize;
799    let mut rel_count = 0usize;
800
801    for item in &extracted_entities {
802        let entity_type = match item.entity_type.parse::<EntityType>() {
803            Ok(et) => et,
804            Err(_) => {
805                tracing::warn!(
806                    target: "enrich",
807                    entity = %item.name,
808                    entity_type = %item.entity_type,
809                    "entity type not recognized, skipping"
810                );
811                continue;
812            }
813        };
814        match entities::upsert_entity(
815            conn,
816            namespace,
817            &NewEntity {
818                name: item.name.clone(),
819                entity_type,
820                description: None,
821            },
822        ) {
823            Ok(eid) => {
824                let _ = entities::link_memory_entity(conn, memory_id, eid);
825                ent_count += 1;
826            }
827            Err(e) => {
828                tracing::warn!(
829                    target: "enrich",
830                    entity = %item.name,
831                    error = %e,
832                    "entity upsert skipped"
833                );
834            }
835        }
836    }
837
838    for rel in &extracted_rels {
839        let normalized = crate::parsers::normalize_relation(&rel.relation);
840        crate::parsers::warn_if_non_canonical(&normalized);
841
842        // Normalize entity names before lookup: upsert_entity normalizes on write,
843        // so the lookup must use the same normalized form to find the row.
844        let src_name = crate::parsers::normalize_entity_name(&rel.source);
845        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
846        let src_id = entities::find_entity_id(conn, namespace, &src_name);
847        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
848        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
849            let new_rel = NewRelationship {
850                source: rel.source.clone(),
851                target: rel.target.clone(),
852                relation: normalized,
853                strength: rel.strength,
854                description: None,
855            };
856            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
857                rel_count += 1;
858            }
859        }
860    }
861
862    Ok((ent_count, rel_count))
863}
864
865/// Updates an entity's description directly in the `entities` table.
866fn persist_entity_description(
867    conn: &Connection,
868    entity_id: i64,
869    description: &str,
870) -> Result<(), AppError> {
871    conn.execute(
872        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
873        rusqlite::params![description, entity_id],
874    )?;
875    Ok(())
876}
877
878/// Persists an enriched memory body (body-enrich, GAP-18).
879///
880/// Uses `memories::update` to set the new body and `sync_fts_after_update`
881/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
882fn persist_enriched_body(
883    conn: &Connection,
884    namespace: &str,
885    memory_id: i64,
886    memory_name: &str,
887    new_body: &str,
888    paths: &crate::paths::AppPaths,
889) -> Result<(), AppError> {
890    // Read current values for FTS sync
891    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
892        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
893        rusqlite::params![memory_id],
894        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
895    )?;
896
897    let memory_type: String = conn.query_row(
898        "SELECT type FROM memories WHERE id=?1",
899        rusqlite::params![memory_id],
900        |r| r.get(0),
901    )?;
902
903    let description: String = conn.query_row(
904        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
905        rusqlite::params![memory_id],
906        |r| r.get(0),
907    )?;
908
909    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
910
911    let new_memory = memories::NewMemory {
912        namespace: namespace.to_string(),
913        name: memory_name.to_string(),
914        memory_type: memory_type.clone(),
915        description: description.clone(),
916        body: new_body.to_string(),
917        body_hash,
918        session_id: None,
919        source: "enrich".to_string(),
920        metadata: serde_json::Value::Object(serde_json::Map::new()),
921    };
922
923    memories::update(conn, memory_id, &new_memory, None)?;
924    memories::sync_fts_after_update(
925        conn,
926        memory_id,
927        &old_name,
928        &old_desc,
929        &old_body,
930        &new_memory.name,
931        &new_memory.description,
932        &new_memory.body,
933    )?;
934
935    // Re-embed for recall accuracy
936    let snippet: String = new_body.chars().take(200).collect();
937    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
938    let chunks_info = crate::chunking::split_into_chunks_hierarchical(new_body, tokenizer);
939    let embedding_result = if chunks_info.len() <= 1 {
940        crate::daemon::embed_passage_or_local(&paths.models, new_body)
941    } else {
942        let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
943        let mut ok = true;
944        for chunk in &chunks_info {
945            let text = crate::chunking::chunk_text(new_body, chunk);
946            match crate::daemon::embed_passage_or_local(&paths.models, text) {
947                Ok(emb) => chunk_embeddings.push(emb),
948                Err(e) => {
949                    tracing::warn!(target: "enrich", error = %e, "chunk embedding failed");
950                    ok = false;
951                    break;
952                }
953            }
954        }
955        if ok {
956            Ok(crate::chunking::aggregate_embeddings(&chunk_embeddings))
957        } else {
958            crate::daemon::embed_passage_or_local(&paths.models, new_body)
959        }
960    };
961
962    if let Ok(embedding) = embedding_result {
963        if let Err(e) = memories::upsert_vec(
964            conn,
965            memory_id,
966            namespace,
967            &memory_type,
968            &embedding,
969            memory_name,
970            &snippet,
971        ) {
972            tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
973        }
974    }
975
976    Ok(())
977}
978
979// ---------------------------------------------------------------------------
980// Main entry point
981// ---------------------------------------------------------------------------
982
983/// Main entry point for the `enrich` command.
984pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
985    let started = Instant::now();
986
987    let paths = AppPaths::resolve(args.db.as_deref())?;
988    ensure_db_ready(&paths)?;
989    let conn = open_rw(&paths.db)?;
990    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
991
992    // Validate provider binary upfront
993    let provider_binary = match args.mode {
994        EnrichMode::ClaudeCode => {
995            let bin = find_claude_binary(args.claude_binary.as_deref())?;
996            let version = validate_claude_version_local(&bin)?;
997            tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
998            emit_json(&PhaseEvent {
999                phase: "validate",
1000                binary_path: bin.to_str(),
1001                version: Some(&version),
1002                items_total: None,
1003                items_pending: None,
1004            });
1005            bin
1006        }
1007        EnrichMode::Codex => {
1008            // Codex provider: locate binary using env or PATH
1009            let bin = find_codex_binary(args.codex_binary.as_deref())?;
1010            emit_json(&PhaseEvent {
1011                phase: "validate",
1012                binary_path: bin.to_str(),
1013                version: None,
1014                items_total: None,
1015                items_pending: None,
1016            });
1017            bin
1018        }
1019    };
1020
1021    // SCAN phase
1022    let scan_result = scan_operation(&conn, &namespace, args)?;
1023    let total = scan_result.len();
1024
1025    emit_json(&PhaseEvent {
1026        phase: "scan",
1027        binary_path: None,
1028        version: None,
1029        items_total: Some(total),
1030        items_pending: Some(total),
1031    });
1032
1033    // Dry-run: emit preview events and summary without calling LLM
1034    if args.dry_run {
1035        for (idx, key) in scan_result.iter().enumerate() {
1036            emit_json(&ItemEvent {
1037                item: key,
1038                status: "preview",
1039                memory_id: None,
1040                entity_id: None,
1041                entities: None,
1042                rels: None,
1043                chars_before: None,
1044                chars_after: None,
1045                cost_usd: None,
1046                elapsed_ms: None,
1047                error: None,
1048                index: idx,
1049                total,
1050            });
1051        }
1052        emit_json(&EnrichSummary {
1053            summary: true,
1054            operation: format!("{:?}", args.operation),
1055            items_total: total,
1056            completed: 0,
1057            failed: 0,
1058            skipped: 0,
1059            cost_usd: 0.0,
1060            elapsed_ms: started.elapsed().as_millis() as u64,
1061        });
1062        return Ok(());
1063    }
1064
1065    // For operations not yet fully implemented, emit a clear structured response
1066    // and exit without calling the LLM, so callers can branch on the NDJSON.
1067    match args.operation {
1068        EnrichOperation::MemoryBindings
1069        | EnrichOperation::EntityDescriptions
1070        | EnrichOperation::BodyEnrich => {
1071            // Fully implemented below
1072        }
1073        _ => {
1074            for (idx, key) in scan_result.iter().enumerate() {
1075                emit_json(&serde_json::json!({
1076                    "item": key,
1077                    "status": "not_yet_implemented",
1078                    "operation": format!("{:?}", args.operation),
1079                    "index": idx,
1080                    "total": total
1081                }));
1082            }
1083            emit_json(&EnrichSummary {
1084                summary: true,
1085                operation: format!("{:?}", args.operation),
1086                items_total: total,
1087                completed: 0,
1088                failed: 0,
1089                skipped: total,
1090                cost_usd: 0.0,
1091                elapsed_ms: started.elapsed().as_millis() as u64,
1092            });
1093            return Ok(());
1094        }
1095    }
1096
1097    // Queue setup for resume/retry
1098    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1099
1100    if args.resume {
1101        let reset = queue_conn
1102            .execute(
1103                "UPDATE queue SET status='pending' WHERE status='processing'",
1104                [],
1105            )
1106            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1107        if reset > 0 {
1108            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1109        }
1110    }
1111
1112    if args.retry_failed {
1113        let count = queue_conn
1114            .execute(
1115                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1116                [],
1117            )
1118            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1119        tracing::info!(target: "enrich", count, "retrying failed items");
1120    }
1121
1122    if !args.resume && !args.retry_failed {
1123        queue_conn
1124            .execute("DELETE FROM queue", [])
1125            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1126    }
1127
1128    // Populate queue
1129    for (idx, key) in scan_result.iter().enumerate() {
1130        let item_type = match args.operation {
1131            EnrichOperation::EntityDescriptions => "entity",
1132            _ => "memory",
1133        };
1134        let _ = queue_conn.execute(
1135            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1136            rusqlite::params![key, item_type],
1137        );
1138        let _ = idx; // suppress unused warning
1139    }
1140
1141    let mut completed = 0usize;
1142    let mut failed = 0usize;
1143    let mut skipped = 0usize;
1144    let mut cost_total = 0.0f64;
1145    let mut oauth_detected = false;
1146    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1147
1148    let provider_timeout = match args.mode {
1149        EnrichMode::ClaudeCode => args.claude_timeout,
1150        EnrichMode::Codex => args.codex_timeout,
1151    };
1152
1153    let provider_model: Option<&str> = match args.mode {
1154        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1155        EnrichMode::Codex => args.codex_model.as_deref(),
1156    };
1157
1158    loop {
1159        // Budget check
1160        if let Some(budget) = args.max_cost_usd {
1161            if !oauth_detected && cost_total >= budget {
1162                tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1163                break;
1164            }
1165        }
1166
1167        // Dequeue next pending item
1168        let pending: Option<(i64, String, String)> = queue_conn
1169            .query_row(
1170                "UPDATE queue SET status='processing', attempt=attempt+1 \
1171                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1172                 RETURNING id, item_key, item_type",
1173                [],
1174                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1175            )
1176            .ok();
1177
1178        let (queue_id, item_key, item_type) = match pending {
1179            Some(p) => p,
1180            None => break,
1181        };
1182
1183        let item_started = Instant::now();
1184        let current_index = completed + failed + skipped;
1185
1186        let call_result = match args.operation {
1187            EnrichOperation::MemoryBindings => call_memory_bindings(
1188                &conn,
1189                &namespace,
1190                &item_key,
1191                &provider_binary,
1192                provider_model,
1193                provider_timeout,
1194                &args.mode,
1195            ),
1196            EnrichOperation::EntityDescriptions => call_entity_description(
1197                &conn,
1198                &namespace,
1199                &item_key,
1200                &provider_binary,
1201                provider_model,
1202                provider_timeout,
1203                &args.mode,
1204            ),
1205            EnrichOperation::BodyEnrich => call_body_enrich(
1206                &conn,
1207                &namespace,
1208                &item_key,
1209                &provider_binary,
1210                provider_model,
1211                provider_timeout,
1212                &args.mode,
1213                args.min_output_chars,
1214                args.max_output_chars,
1215                args.prompt_template.as_deref(),
1216                &paths,
1217            ),
1218            _ => unreachable!("non-implemented ops handled above"),
1219        };
1220
1221        match call_result {
1222            Ok(EnrichItemResult::Done {
1223                memory_id,
1224                entity_id,
1225                entities,
1226                rels,
1227                chars_before,
1228                chars_after,
1229                cost,
1230                is_oauth,
1231            }) => {
1232                if is_oauth && !oauth_detected {
1233                    oauth_detected = true;
1234                    tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
1235                }
1236                backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1237
1238                // Persist depends on the operation
1239                let persist_err: Option<String> = match args.operation {
1240                    EnrichOperation::MemoryBindings => {
1241                        // Bindings already persisted inside call_memory_bindings
1242                        None
1243                    }
1244                    EnrichOperation::EntityDescriptions => {
1245                        // Description already persisted inside call_entity_description
1246                        None
1247                    }
1248                    EnrichOperation::BodyEnrich => {
1249                        // Body already persisted inside call_body_enrich
1250                        None
1251                    }
1252                    _ => unreachable!(),
1253                };
1254
1255                let _ = queue_conn.execute(
1256                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1257                    rusqlite::params![
1258                        memory_id,
1259                        entity_id,
1260                        entities as i64,
1261                        rels as i64,
1262                        cost,
1263                        item_started.elapsed().as_millis() as i64,
1264                        queue_id
1265                    ],
1266                );
1267
1268                if persist_err.is_none() {
1269                    completed += 1;
1270                    if !is_oauth {
1271                        cost_total += cost;
1272                    }
1273                    emit_json(&ItemEvent {
1274                        item: &item_key,
1275                        status: "done",
1276                        memory_id,
1277                        entity_id,
1278                        entities: Some(entities),
1279                        rels: Some(rels),
1280                        chars_before,
1281                        chars_after,
1282                        cost_usd: if is_oauth { None } else { Some(cost) },
1283                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1284                        error: None,
1285                        index: current_index,
1286                        total,
1287                    });
1288                } else {
1289                    failed += 1;
1290                    emit_json(&ItemEvent {
1291                        item: &item_key,
1292                        status: "failed",
1293                        memory_id: None,
1294                        entity_id: None,
1295                        entities: None,
1296                        rels: None,
1297                        chars_before: None,
1298                        chars_after: None,
1299                        cost_usd: None,
1300                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1301                        error: persist_err,
1302                        index: current_index,
1303                        total,
1304                    });
1305                }
1306            }
1307            Ok(EnrichItemResult::Skipped { reason }) => {
1308                skipped += 1;
1309                let _ = queue_conn.execute(
1310                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1311                    rusqlite::params![reason, queue_id],
1312                );
1313                emit_json(&ItemEvent {
1314                    item: &item_key,
1315                    status: "skipped",
1316                    memory_id: None,
1317                    entity_id: None,
1318                    entities: None,
1319                    rels: None,
1320                    chars_before: None,
1321                    chars_after: None,
1322                    cost_usd: None,
1323                    elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1324                    error: None,
1325                    index: current_index,
1326                    total,
1327                });
1328            }
1329            Err(e) => {
1330                let err_str = format!("{e}");
1331                if err_str.contains("RATE_LIMITED") {
1332                    tracing::warn!(target: "enrich", wait_seconds = backoff_secs, "rate limited, waiting before retry");
1333                    let _ = queue_conn.execute(
1334                        "UPDATE queue SET status='pending' WHERE id=?1",
1335                        rusqlite::params![queue_id],
1336                    );
1337                    std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1338                    backoff_secs = (backoff_secs * 2).min(900);
1339                    continue;
1340                }
1341
1342                failed += 1;
1343                let _ = queue_conn.execute(
1344                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1345                    rusqlite::params![err_str, queue_id],
1346                );
1347                emit_json(&ItemEvent {
1348                    item: &item_key,
1349                    status: "failed",
1350                    memory_id: None,
1351                    entity_id: None,
1352                    entities: None,
1353                    rels: None,
1354                    chars_before: None,
1355                    chars_after: None,
1356                    cost_usd: None,
1357                    elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1358                    error: Some(err_str),
1359                    index: current_index,
1360                    total,
1361                });
1362            }
1363        }
1364
1365        let _ = item_type; // used via queue schema only
1366    }
1367
1368    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1369    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1370
1371    emit_json(&EnrichSummary {
1372        summary: true,
1373        operation: format!("{:?}", args.operation),
1374        items_total: total,
1375        completed,
1376        failed,
1377        skipped,
1378        cost_usd: cost_total,
1379        elapsed_ms: started.elapsed().as_millis() as u64,
1380    });
1381
1382    if failed == 0 {
1383        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
1384    }
1385
1386    Ok(())
1387}
1388
1389// ---------------------------------------------------------------------------
1390// Internal result type for a single item call
1391// ---------------------------------------------------------------------------
1392
1393enum EnrichItemResult {
1394    Done {
1395        memory_id: Option<i64>,
1396        entity_id: Option<i64>,
1397        entities: usize,
1398        rels: usize,
1399        chars_before: Option<usize>,
1400        chars_after: Option<usize>,
1401        cost: f64,
1402        is_oauth: bool,
1403    },
1404    Skipped {
1405        reason: String,
1406    },
1407}
1408
1409// ---------------------------------------------------------------------------
1410// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
1411// ---------------------------------------------------------------------------
1412
1413fn call_memory_bindings(
1414    conn: &Connection,
1415    namespace: &str,
1416    memory_name: &str,
1417    binary: &Path,
1418    model: Option<&str>,
1419    timeout: u64,
1420    mode: &EnrichMode,
1421) -> Result<EnrichItemResult, AppError> {
1422    // Look up the memory
1423    let (memory_id, body): (i64, String) = conn.query_row(
1424        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1425        rusqlite::params![namespace, memory_name],
1426        |r| Ok((r.get(0)?, r.get(1)?)),
1427    ).map_err(|e| match e {
1428        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
1429        other => AppError::Database(other),
1430    })?;
1431
1432    if body.trim().is_empty() {
1433        return Ok(EnrichItemResult::Skipped {
1434            reason: "body is empty".to_string(),
1435        });
1436    }
1437
1438    let (value, cost, is_oauth) = match mode {
1439        EnrichMode::ClaudeCode => call_claude(
1440            binary,
1441            BINDINGS_PROMPT,
1442            BINDINGS_SCHEMA,
1443            &body,
1444            model,
1445            timeout,
1446        )?,
1447        EnrichMode::Codex => call_codex(
1448            binary,
1449            BINDINGS_PROMPT,
1450            BINDINGS_SCHEMA,
1451            &body,
1452            model,
1453            timeout,
1454        )?,
1455    };
1456
1457    let empty_arr = serde_json::Value::Array(vec![]);
1458    let entities_val = value.get("entities").unwrap_or(&empty_arr);
1459    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
1460
1461    let (ent_count, rel_count) =
1462        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
1463
1464    Ok(EnrichItemResult::Done {
1465        memory_id: Some(memory_id),
1466        entity_id: None,
1467        entities: ent_count,
1468        rels: rel_count,
1469        chars_before: None,
1470        chars_after: None,
1471        cost,
1472        is_oauth,
1473    })
1474}
1475
1476fn call_entity_description(
1477    conn: &Connection,
1478    namespace: &str,
1479    entity_name: &str,
1480    binary: &Path,
1481    model: Option<&str>,
1482    timeout: u64,
1483    mode: &EnrichMode,
1484) -> Result<EnrichItemResult, AppError> {
1485    let (entity_id, entity_type): (i64, String) = conn
1486        .query_row(
1487            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
1488            rusqlite::params![namespace, entity_name],
1489            |r| Ok((r.get(0)?, r.get(1)?)),
1490        )
1491        .map_err(|e| match e {
1492            rusqlite::Error::QueryReturnedNoRows => {
1493                AppError::NotFound(format!("entity '{entity_name}' not found"))
1494            }
1495            other => AppError::Database(other),
1496        })?;
1497
1498    let prompt = format!(
1499        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
1500    );
1501
1502    let (value, cost, is_oauth) = match mode {
1503        EnrichMode::ClaudeCode => call_claude(
1504            binary,
1505            &prompt,
1506            ENTITY_DESCRIPTION_SCHEMA,
1507            "",
1508            model,
1509            timeout,
1510        )?,
1511        EnrichMode::Codex => call_codex(
1512            binary,
1513            &prompt,
1514            ENTITY_DESCRIPTION_SCHEMA,
1515            "",
1516            model,
1517            timeout,
1518        )?,
1519    };
1520
1521    let description = value
1522        .get("description")
1523        .and_then(|v| v.as_str())
1524        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
1525
1526    persist_entity_description(conn, entity_id, description)?;
1527
1528    Ok(EnrichItemResult::Done {
1529        memory_id: None,
1530        entity_id: Some(entity_id),
1531        entities: 0,
1532        rels: 0,
1533        chars_before: None,
1534        chars_after: None,
1535        cost,
1536        is_oauth,
1537    })
1538}
1539
1540#[allow(clippy::too_many_arguments)]
1541fn call_body_enrich(
1542    conn: &Connection,
1543    namespace: &str,
1544    memory_name: &str,
1545    binary: &Path,
1546    model: Option<&str>,
1547    timeout: u64,
1548    mode: &EnrichMode,
1549    min_output_chars: usize,
1550    max_output_chars: usize,
1551    prompt_template: Option<&Path>,
1552    paths: &crate::paths::AppPaths,
1553) -> Result<EnrichItemResult, AppError> {
1554    let (memory_id, body): (i64, String) = conn.query_row(
1555        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1556        rusqlite::params![namespace, memory_name],
1557        |r| Ok((r.get(0)?, r.get(1)?)),
1558    ).map_err(|e| match e {
1559        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
1560        other => AppError::Database(other),
1561    })?;
1562
1563    let chars_before = body.chars().count();
1564
1565    // Load custom prompt template if provided
1566    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
1567        std::fs::read_to_string(tmpl_path).map_err(|e| {
1568            AppError::Io(std::io::Error::new(
1569                e.kind(),
1570                format!("failed to read prompt template: {e}"),
1571            ))
1572        })?
1573    } else {
1574        BODY_ENRICH_PROMPT_PREFIX.to_string()
1575    };
1576
1577    let prompt = format!(
1578        "{prompt_prefix}Target minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
1579    );
1580
1581    // The body schema uses a free-form enriched_body field
1582    let (value, cost, is_oauth) = match mode {
1583        EnrichMode::ClaudeCode => {
1584            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1585        }
1586        EnrichMode::Codex => {
1587            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1588        }
1589    };
1590
1591    let enriched_body = value
1592        .get("enriched_body")
1593        .and_then(|v| v.as_str())
1594        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
1595
1596    let chars_after = enriched_body.chars().count();
1597
1598    // Only persist if the enriched body is genuinely longer
1599    if chars_after <= chars_before {
1600        return Ok(EnrichItemResult::Skipped {
1601            reason: format!(
1602                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
1603            ),
1604        });
1605    }
1606
1607    persist_enriched_body(
1608        conn,
1609        namespace,
1610        memory_id,
1611        memory_name,
1612        enriched_body,
1613        paths,
1614    )?;
1615
1616    Ok(EnrichItemResult::Done {
1617        memory_id: Some(memory_id),
1618        entity_id: None,
1619        entities: 0,
1620        rels: 0,
1621        chars_before: Some(chars_before),
1622        chars_after: Some(chars_after),
1623        cost,
1624        is_oauth,
1625    })
1626}
1627
1628// ---------------------------------------------------------------------------
1629// Scan dispatcher — maps operation to scan query result (item keys)
1630// ---------------------------------------------------------------------------
1631
1632fn scan_operation(
1633    conn: &Connection,
1634    namespace: &str,
1635    args: &EnrichArgs,
1636) -> Result<Vec<String>, AppError> {
1637    match args.operation {
1638        EnrichOperation::MemoryBindings => {
1639            let rows = scan_unbound_memories(conn, namespace, args.limit)?;
1640            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1641        }
1642        EnrichOperation::EntityDescriptions => {
1643            let rows = scan_entities_without_description(conn, namespace, args.limit)?;
1644            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1645        }
1646        EnrichOperation::BodyEnrich => {
1647            let rows =
1648                scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
1649            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1650        }
1651        // Scan-only operations: return all memories as candidates
1652        EnrichOperation::WeightCalibrate
1653        | EnrichOperation::RelationReclassify
1654        | EnrichOperation::EntityConnect
1655        | EnrichOperation::EntityTypeValidate
1656        | EnrichOperation::DescriptionEnrich
1657        | EnrichOperation::CrossDomainBridges
1658        | EnrichOperation::DomainClassify
1659        | EnrichOperation::GraphAudit
1660        | EnrichOperation::DeepResearchSynth
1661        | EnrichOperation::BodyExtract => {
1662            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1663            let sql = format!(
1664                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
1665            );
1666            let mut stmt = conn.prepare(&sql)?;
1667            let names = stmt
1668                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
1669                .collect::<Result<Vec<_>, _>>()?;
1670            Ok(names)
1671        }
1672    }
1673}
1674
1675// ---------------------------------------------------------------------------
1676// Codex stub provider
1677// ---------------------------------------------------------------------------
1678
1679/// Locates the Codex CLI binary.
1680fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
1681    if let Some(p) = explicit {
1682        if p.exists() {
1683            return Ok(p.to_path_buf());
1684        }
1685        return Err(AppError::Validation(format!(
1686            "Codex binary not found at explicit path: {}",
1687            p.display()
1688        )));
1689    }
1690
1691    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
1692        let p = PathBuf::from(&env_path);
1693        if p.exists() {
1694            return Ok(p);
1695        }
1696    }
1697
1698    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
1699    if let Some(path_var) = std::env::var_os("PATH") {
1700        for dir in std::env::split_paths(&path_var) {
1701            let candidate = dir.join(name);
1702            if candidate.exists() {
1703                return Ok(candidate);
1704            }
1705        }
1706    }
1707
1708    Err(AppError::Validation(
1709        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
1710    ))
1711}
1712
1713/// Calls the Codex CLI for a single enrichment item.
1714///
1715/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
1716fn call_codex(
1717    binary: &Path,
1718    prompt: &str,
1719    json_schema: &str,
1720    input_text: &str,
1721    model: Option<&str>,
1722    timeout_secs: u64,
1723) -> Result<(serde_json::Value, f64, bool), AppError> {
1724    use wait_timeout::ChildExt;
1725
1726    let full_prompt = format!("{prompt}\n\n{input_text}");
1727    let schema_file = {
1728        let tmp = std::env::temp_dir().join(format!("enrich-schema-{}.json", std::process::id()));
1729        std::fs::write(&tmp, json_schema).map_err(AppError::Io)?;
1730        tmp
1731    };
1732
1733    let mut cmd = Command::new(binary);
1734    cmd.env_clear();
1735    for var in &[
1736        "PATH",
1737        "HOME",
1738        "USER",
1739        "OPENAI_API_KEY",
1740        "TMPDIR",
1741        "TMP",
1742        "TEMP",
1743    ] {
1744        if let Ok(val) = std::env::var(var) {
1745            cmd.env(var, val);
1746        }
1747    }
1748
1749    cmd.arg("exec")
1750        .arg("--json")
1751        .arg("--output-schema")
1752        .arg(&schema_file);
1753
1754    if let Some(m) = model {
1755        cmd.arg("--model").arg(m);
1756    }
1757
1758    cmd.stdin(Stdio::piped())
1759        .stdout(Stdio::piped())
1760        .stderr(Stdio::piped());
1761
1762    let mut child = cmd.spawn().map_err(|e| {
1763        AppError::Io(std::io::Error::new(
1764            e.kind(),
1765            format!("failed to spawn codex: {e}"),
1766        ))
1767    })?;
1768
1769    // Write prompt via stdin
1770    if let Some(mut stdin) = child.stdin.take() {
1771        let _ = stdin.write_all(full_prompt.as_bytes());
1772    }
1773
1774    let timeout = std::time::Duration::from_secs(timeout_secs);
1775    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
1776
1777    let _ = std::fs::remove_file(&schema_file);
1778
1779    match status {
1780        Some(exit_status) => {
1781            let mut stdout_buf = Vec::new();
1782            if let Some(mut out) = child.stdout.take() {
1783                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
1784            }
1785            if !exit_status.success() {
1786                let mut stderr_buf = Vec::new();
1787                if let Some(mut err) = child.stderr.take() {
1788                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
1789                }
1790                return Err(AppError::Validation(format!(
1791                    "codex exited with code {:?}: {}",
1792                    exit_status.code(),
1793                    String::from_utf8_lossy(&stderr_buf).trim()
1794                )));
1795            }
1796            let stdout_str = String::from_utf8(stdout_buf)
1797                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
1798            let value: serde_json::Value = serde_json::from_str(&stdout_str).map_err(|e| {
1799                AppError::Validation(format!("failed to parse codex output as JSON: {e}"))
1800            })?;
1801            Ok((value, 0.0, false))
1802        }
1803        None => {
1804            let _ = child.kill();
1805            let _ = child.wait();
1806            Err(AppError::Validation(format!(
1807                "codex timed out after {timeout_secs} seconds"
1808            )))
1809        }
1810    }
1811}
1812
1813// ---------------------------------------------------------------------------
1814// Tests
1815// ---------------------------------------------------------------------------
1816
1817#[cfg(test)]
1818mod tests {
1819    use super::*;
1820    use rusqlite::Connection;
1821
1822    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
1823    fn open_test_db() -> Connection {
1824        let conn = Connection::open_in_memory().expect("in-memory db");
1825        conn.execute_batch(
1826            "CREATE TABLE memories (
1827                id          INTEGER PRIMARY KEY AUTOINCREMENT,
1828                namespace   TEXT NOT NULL DEFAULT 'global',
1829                name        TEXT NOT NULL,
1830                type        TEXT NOT NULL DEFAULT 'note',
1831                description TEXT NOT NULL DEFAULT '',
1832                body        TEXT NOT NULL DEFAULT '',
1833                body_hash   TEXT NOT NULL DEFAULT '',
1834                session_id  TEXT,
1835                source      TEXT NOT NULL DEFAULT 'agent',
1836                metadata    TEXT NOT NULL DEFAULT '{}',
1837                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
1838                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
1839                deleted_at  INTEGER,
1840                UNIQUE(namespace, name)
1841            );
1842            CREATE TABLE entities (
1843                id          INTEGER PRIMARY KEY AUTOINCREMENT,
1844                namespace   TEXT NOT NULL DEFAULT 'global',
1845                name        TEXT NOT NULL,
1846                type        TEXT NOT NULL DEFAULT 'concept',
1847                description TEXT,
1848                degree      INTEGER NOT NULL DEFAULT 0,
1849                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
1850                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
1851                UNIQUE(namespace, name)
1852            );
1853            CREATE TABLE memory_entities (
1854                memory_id  INTEGER NOT NULL,
1855                entity_id  INTEGER NOT NULL,
1856                PRIMARY KEY (memory_id, entity_id)
1857            );
1858            CREATE TABLE relationships (
1859                id         INTEGER PRIMARY KEY AUTOINCREMENT,
1860                namespace  TEXT NOT NULL DEFAULT 'global',
1861                source_id  INTEGER NOT NULL,
1862                target_id  INTEGER NOT NULL,
1863                relation   TEXT NOT NULL,
1864                weight     REAL NOT NULL DEFAULT 0.5,
1865                description TEXT,
1866                UNIQUE(source_id, target_id, relation)
1867            );",
1868        )
1869        .expect("schema creation must succeed");
1870        conn
1871    }
1872
1873    #[test]
1874    fn scan_unbound_memories_finds_memories_without_bindings() {
1875        let conn = open_test_db();
1876        conn.execute(
1877            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
1878            [],
1879        )
1880        .unwrap();
1881
1882        let results = scan_unbound_memories(&conn, "global", None).unwrap();
1883        assert_eq!(results.len(), 1);
1884        assert_eq!(results[0].1, "test-mem");
1885    }
1886
1887    #[test]
1888    fn scan_unbound_memories_excludes_bound_memories() {
1889        let conn = open_test_db();
1890        conn.execute(
1891            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
1892            [],
1893        )
1894        .unwrap();
1895        let mem_id: i64 = conn
1896            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
1897                r.get(0)
1898            })
1899            .unwrap();
1900        conn.execute(
1901            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
1902            [],
1903        )
1904        .unwrap();
1905        let ent_id: i64 = conn
1906            .query_row(
1907                "SELECT id FROM entities WHERE name='some-entity'",
1908                [],
1909                |r| r.get(0),
1910            )
1911            .unwrap();
1912        conn.execute(
1913            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
1914            rusqlite::params![mem_id, ent_id],
1915        )
1916        .unwrap();
1917
1918        let results = scan_unbound_memories(&conn, "global", None).unwrap();
1919        assert!(results.is_empty(), "bound memory must not appear in scan");
1920    }
1921
1922    #[test]
1923    fn scan_entities_without_description_finds_null_description() {
1924        let conn = open_test_db();
1925        conn.execute(
1926            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
1927            [],
1928        )
1929        .unwrap();
1930
1931        let results = scan_entities_without_description(&conn, "global", None).unwrap();
1932        assert_eq!(results.len(), 1);
1933        assert_eq!(results[0].1, "my-tool");
1934    }
1935
1936    #[test]
1937    fn scan_entities_without_description_excludes_entities_with_description() {
1938        let conn = open_test_db();
1939        conn.execute(
1940            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
1941            [],
1942        )
1943        .unwrap();
1944
1945        let results = scan_entities_without_description(&conn, "global", None).unwrap();
1946        assert!(
1947            results.is_empty(),
1948            "entity with description must not appear"
1949        );
1950    }
1951
1952    #[test]
1953    fn scan_short_body_memories_finds_short_bodies() {
1954        let conn = open_test_db();
1955        conn.execute(
1956            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
1957            [],
1958        )
1959        .unwrap();
1960
1961        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
1962        assert_eq!(results.len(), 1);
1963        assert_eq!(results[0].1, "short-mem");
1964    }
1965
1966    #[test]
1967    fn scan_short_body_memories_excludes_long_bodies() {
1968        let conn = open_test_db();
1969        let long_body = "a".repeat(1000);
1970        conn.execute(
1971            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
1972            rusqlite::params![long_body],
1973        )
1974        .unwrap();
1975
1976        let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
1977        assert!(results.is_empty(), "long memory must not appear in scan");
1978    }
1979
1980    #[test]
1981    fn scan_respects_limit() {
1982        let conn = open_test_db();
1983        for i in 0..5 {
1984            conn.execute(
1985                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
1986                [],
1987            )
1988            .unwrap();
1989        }
1990
1991        let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
1992        assert_eq!(results.len(), 3, "limit must be respected");
1993    }
1994
1995    #[test]
1996    fn queue_db_schema_creates_correctly() {
1997        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
1998        let conn = open_queue_db(&tmp_path).expect("queue db must open");
1999        let count: i64 = conn
2000            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
2001            .unwrap();
2002        assert_eq!(count, 0);
2003        let _ = std::fs::remove_file(&tmp_path);
2004    }
2005
2006    #[test]
2007    fn parse_claude_json_output_valid_bindings() {
2008        let output = r#"[
2009            {"type":"system","subtype":"init"},
2010            {"type":"result","is_error":false,"total_cost_usd":0.01,
2011             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
2012        ]"#;
2013        let (value, cost, is_oauth) =
2014            parse_claude_json_output(output).expect("must parse successfully");
2015        assert!(value.get("entities").is_some());
2016        assert!((cost - 0.01).abs() < f64::EPSILON);
2017        assert!(!is_oauth);
2018    }
2019
2020    #[test]
2021    fn parse_claude_json_output_detects_oauth() {
2022        let output = r#"[
2023            {"type":"system","subtype":"init","apiKeySource":"none"},
2024            {"type":"result","is_error":false,"total_cost_usd":0.0,
2025             "structured_output":{"entities":[],"relationships":[]}}
2026        ]"#;
2027        let (_value, _cost, is_oauth) = parse_claude_json_output(output).unwrap();
2028        assert!(is_oauth);
2029    }
2030
2031    #[test]
2032    fn parse_claude_json_output_rate_limit_returns_error() {
2033        let output = r#"[
2034            {"type":"system","subtype":"init"},
2035            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
2036        ]"#;
2037        let err = parse_claude_json_output(output).unwrap_err();
2038        assert!(format!("{err}").contains("RATE_LIMITED"));
2039    }
2040
2041    #[test]
2042    fn parse_claude_json_output_auth_error() {
2043        let output = r#"[
2044            {"type":"system","subtype":"init"},
2045            {"type":"result","is_error":true,"error":"authentication failed"}
2046        ]"#;
2047        let err = parse_claude_json_output(output).unwrap_err();
2048        assert!(format!("{err}").contains("authentication failed"));
2049    }
2050
2051    #[test]
2052    fn dry_run_emits_preview_without_calling_llm() {
2053        // This test validates the dry-run NDJSON contract without spawning any process.
2054        // The scan_operation function requires a DB; we build one in-memory but cannot
2055        // call run() directly because it needs AppPaths (disk). Instead we test the
2056        // lower-level helpers that the dry-run path relies on.
2057        let conn = open_test_db();
2058        conn.execute(
2059            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
2060            [],
2061        )
2062        .unwrap();
2063
2064        let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
2065        assert_eq!(results.len(), 1);
2066        assert_eq!(results[0].1, "dry-mem");
2067        // If scan finds the item and dry_run is set, no LLM would be called.
2068        // The NDJSON emission is tested via integration tests with a fake binary.
2069    }
2070
2071    #[test]
2072    fn persist_entity_description_updates_db() {
2073        let conn = open_test_db();
2074        conn.execute(
2075            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
2076            [],
2077        )
2078        .unwrap();
2079        let eid: i64 = conn
2080            .query_row(
2081                "SELECT id FROM entities WHERE name='tokio-runtime'",
2082                [],
2083                |r| r.get(0),
2084            )
2085            .unwrap();
2086
2087        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
2088
2089        let desc: String = conn
2090            .query_row(
2091                "SELECT description FROM entities WHERE id=?1",
2092                rusqlite::params![eid],
2093                |r| r.get(0),
2094            )
2095            .unwrap();
2096        assert_eq!(desc, "Async runtime for Rust applications");
2097    }
2098
2099    #[test]
2100    fn bindings_schema_is_valid_json() {
2101        let _: serde_json::Value =
2102            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2103    }
2104
2105    #[test]
2106    fn entity_description_schema_is_valid_json() {
2107        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2108            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2109    }
2110
2111    #[test]
2112    fn body_enrich_schema_is_valid_json() {
2113        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2114            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2115    }
2116}