Skip to main content

sqlite_graphrag/commands/
enrich.rs

1// TODO v1.0.89: este arquivo tem 4116 linhas — modularização planejada.
2// Ver ADR-0046 seção "Known Tech Debt (v1.0.89+)".
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY opportunity
19//!
20//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
21//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
22//! here verbatim. A future refactoring could extract them into a shared
23//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
24//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
25//! this stream's boundary — flagged here for the Integration stream to evaluate.
26
27use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42// ---------------------------------------------------------------------------
43// Constants
44// ---------------------------------------------------------------------------
45
46const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51// ---------------------------------------------------------------------------
52// JSON schema used for memory-bindings and body-enrich extraction
53// ---------------------------------------------------------------------------
54
55const BINDINGS_SCHEMA: &str = r#"{
56  "type": "object",
57  "properties": {
58    "entities": {
59      "type": "array",
60      "items": {
61        "type": "object",
62        "properties": {
63          "name": { "type": "string" },
64          "entity_type": {
65            "type": "string",
66            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67          }
68        },
69        "required": ["name", "entity_type"],
70        "additionalProperties": false
71      }
72    },
73    "relationships": {
74      "type": "array",
75      "items": {
76        "type": "object",
77        "properties": {
78          "source": { "type": "string" },
79          "target": { "type": "string" },
80          "relation": {
81            "type": "string",
82            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83          },
84          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85        },
86        "required": ["source","target","relation","strength"],
87        "additionalProperties": false
88      }
89    }
90  },
91  "required": ["entities","relationships"],
92  "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96  "type": "object",
97  "properties": {
98    "description": { "type": "string" }
99  },
100  "required": ["description"],
101  "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105  "type": "object",
106  "properties": {
107    "enriched_body": { "type": "string" }
108  },
109  "required": ["enriched_body"],
110  "additionalProperties": false
111}"#;
112
113// G27 P1: weight-calibrate
114const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123  "type": "object",
124  "properties": {
125    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126    "reasoning": { "type": "string" }
127  },
128  "required": ["calibrated_weight", "reasoning"],
129  "additionalProperties": false
130}"#;
131
132// G27 P1: relation-reclassify
133const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149  "type": "object",
150  "properties": {
151    "relation": { "type": "string" },
152    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153    "reasoning": { "type": "string" }
154  },
155  "required": ["relation", "strength", "reasoning"],
156  "additionalProperties": false
157}"#;
158
159// G27 P2: entity-connect — suggest relationships between isolated entities
160const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166  "type": "object",
167  "properties": {
168    "relation": { "type": "string" },
169    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170    "reasoning": { "type": "string" }
171  },
172  "required": ["relation", "strength", "reasoning"],
173  "additionalProperties": false
174}"#;
175
176// G27 P2: entity-type-validate — verify entity type assignments
177const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183  "type": "object",
184  "properties": {
185    "validated_type": { "type": "string" },
186    "was_correct": { "type": "boolean" },
187    "reasoning": { "type": "string" }
188  },
189  "required": ["validated_type", "was_correct", "reasoning"],
190  "additionalProperties": false
191}"#;
192
193// G27 P2: description-enrich — improve generic memory descriptions
194const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200  "type": "object",
201  "properties": {
202    "description": { "type": "string" },
203    "reasoning": { "type": "string" }
204  },
205  "required": ["description", "reasoning"],
206  "additionalProperties": false
207}"#;
208
209// G27 P2: domain-classify — classify memory into domain category
210const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214  "type": "object",
215  "properties": {
216    "domain": { "type": "string" },
217    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218    "reasoning": { "type": "string" }
219  },
220  "required": ["domain", "confidence", "reasoning"],
221  "additionalProperties": false
222}"#;
223
224// G27 P2: graph-audit — audit graph for quality issues
225const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230  "type": "object",
231  "properties": {
232    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234    "reasoning": { "type": "string" }
235  },
236  "required": ["quality_score", "issues", "reasoning"],
237  "additionalProperties": false
238}"#;
239
240// G27 P2: deep-research-synth — synthesize research findings into graph
241const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247  "type": "object",
248  "properties": {
249    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251    "summary": { "type": "string" }
252  },
253  "required": ["entities", "relationships", "summary"],
254  "additionalProperties": false
255}"#;
256
257// G27 P2: body-extract — extract structured content from unstructured text
258const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263  "type": "object",
264  "properties": {
265    "restructured_body": { "type": "string" },
266    "changes_summary": { "type": "string" }
267  },
268  "required": ["restructured_body", "changes_summary"],
269  "additionalProperties": false
270}"#;
271
272// ---------------------------------------------------------------------------
273// Prompts
274// ---------------------------------------------------------------------------
275
276const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291// ---------------------------------------------------------------------------
292// CLI args
293// ---------------------------------------------------------------------------
294
295/// Operation to perform in the `enrich` command.
296#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299    /// Add missing entity/relationship bindings to memories (fully implemented).
300    MemoryBindings,
301    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
302    EntityDescriptions,
303    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
304    BodyEnrich,
305    /// Rebuild missing memory embeddings without rewriting the memory body.
306    ReEmbed,
307    /// Calibrate relationship weights using LLM analysis (scan only).
308    WeightCalibrate,
309    /// Reclassify relationship types using LLM judgment (scan only).
310    RelationReclassify,
311    /// Connect isolated entities by suggesting new relationships (scan only).
312    EntityConnect,
313    /// Validate entity type assignments using LLM judgment (scan only).
314    EntityTypeValidate,
315    /// Enrich memory descriptions that are generic/auto-generated (scan only).
316    DescriptionEnrich,
317    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
318    CrossDomainBridges,
319    /// Classify memories into domain categories (scan only).
320    DomainClassify,
321    /// Audit the graph for quality issues (scan only).
322    GraphAudit,
323    /// Synthesize deep-research findings into graph memories (scan only).
324    DeepResearchSynth,
325    /// Extract structured body from unstructured text (scan only).
326    BodyExtract,
327}
328
329/// LLM provider for enrichment.
330#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332    /// Use locally installed Claude Code CLI (OAuth-first).
333    ClaudeCode,
334    /// Use locally installed OpenAI Codex CLI.
335    Codex,
336    /// Use locally installed OpenCode CLI.
337    #[value(name = "opencode")]
338    Opencode,
339}
340
341impl std::fmt::Display for EnrichMode {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        match self {
344            EnrichMode::ClaudeCode => write!(f, "claude-code"),
345            EnrichMode::Codex => write!(f, "codex"),
346            EnrichMode::Opencode => write!(f, "opencode"),
347        }
348    }
349}
350
351/// Arguments for the `enrich` subcommand.
352#[derive(clap::Args)]
353#[command(
354    about = "Enrich graph memories and entities using an LLM provider",
355    after_long_help = "EXAMPLES:\n  \
356    # Add missing entity bindings to all unbound memories\n  \
357    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
358    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
359    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
360    # Expand short memory bodies (GAP-18)\n  \
361    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
362    # Rebuild only missing memory embeddings without rewriting bodies\n  \
363    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
364    # Resume an interrupted body-enrich run\n  \
365    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
366    # Retry only failed items from a previous run\n  \
367    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
368    EXIT CODES:\n  \
369    0  success\n  \
370    1  validation error (bad args, binary not found)\n  \
371    14 I/O error"
372)]
373pub struct EnrichArgs {
374    /// Enrichment operation to run.
375    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
376    pub operation: EnrichOperation,
377
378    /// LLM provider to use. Default: claude-code (OAuth-first).
379    #[arg(long, value_enum, default_value = "claude-code")]
380    pub mode: EnrichMode,
381
382    /// Maximum number of items to process in this run. Omit for all.
383    #[arg(long, value_name = "N")]
384    pub limit: Option<usize>,
385
386    /// Preview items without calling the LLM (zero tokens consumed).
387    #[arg(long)]
388    pub dry_run: bool,
389
390    /// Namespace to operate on. Default: global.
391    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
392    pub namespace: Option<String>,
393
394    // -- Provider flags (Claude) --
395    /// Path to the Claude Code binary. Default: auto-detect from PATH.
396    #[arg(long, value_name = "PATH")]
397    pub claude_binary: Option<PathBuf>,
398
399    /// Claude model to use (e.g. claude-sonnet-4-6).
400    #[arg(long, value_name = "MODEL")]
401    pub claude_model: Option<String>,
402
403    /// Timeout per item in seconds when using Claude Code. Default: 300.
404    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
405    pub claude_timeout: u64,
406
407    // -- Provider flags (Codex) --
408    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
409    #[arg(long, value_name = "PATH")]
410    pub codex_binary: Option<PathBuf>,
411
412    /// Codex model to use (e.g. o4-mini).
413    #[arg(long, value_name = "MODEL")]
414    pub codex_model: Option<String>,
415
416    /// Timeout per item in seconds when using Codex. Default: 300.
417    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
418    pub codex_timeout: u64,
419
420    // -- Provider flags (OpenCode) --
421    /// Path to the OpenCode binary. Default: auto-detect from PATH.
422    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
423    pub opencode_binary: Option<PathBuf>,
424
425    /// OpenCode model to use.
426    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
427    pub opencode_model: Option<String>,
428
429    /// Timeout per item in seconds when using OpenCode. Default: 300.
430    #[arg(
431        long,
432        value_name = "SECONDS",
433        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
434        default_value_t = 300
435    )]
436    pub opencode_timeout: u64,
437
438    // -- Cost controls --
439    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
440    #[arg(long, value_name = "USD")]
441    pub max_cost_usd: Option<f64>,
442
443    // -- Queue controls --
444    /// Resume a previously interrupted run (skip already-done items).
445    #[arg(long)]
446    pub resume: bool,
447
448    /// Retry only items that failed in a previous run.
449    #[arg(long)]
450    pub retry_failed: bool,
451
452    // -- body-enrich specific flags (GAP-18) --
453    /// Minimum output character count for body-enrich. Default: 500.
454    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
455    pub min_output_chars: usize,
456
457    /// Maximum output character count for body-enrich. Default: 2000.
458    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
459    pub max_output_chars: usize,
460
461    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
462    #[arg(long, default_value_t = true)]
463    pub preserve_check: bool,
464
465    /// Path to a custom prompt template file for body-enrich.
466    #[arg(long, value_name = "PATH")]
467    pub prompt_template: Option<PathBuf>,
468
469    /// Number of parallel LLM workers (default 1 = serial).
470    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
471    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
472    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
473    pub llm_parallelism: u32,
474
475    // -- Output / infra --
476    /// Emit NDJSON output. Always true; flag accepted for compatibility.
477    #[arg(long)]
478    pub json: bool,
479
480    /// Database path override.
481    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
482    pub db: Option<String>,
483
484    /// G30: poll for the job singleton every second for up to N seconds
485    /// when another invocation holds the lock. Default: 0 (fail fast).
486    #[arg(long, value_name = "SECONDS")]
487    pub wait_job_singleton: Option<u64>,
488
489    /// G30: force acquisition of the singleton lock by removing a stale
490    /// lock file from a previously crashed invocation. Use only when you
491    /// are certain no other `enrich`/`ingest` is running.
492    #[arg(long, default_value_t = false)]
493    pub force_job_singleton: bool,
494
495    /// G37: select a specific subset of memory names to enrich instead of
496    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
497    /// Empty when omitted (processes all candidates).
498    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
499    pub names: Vec<String>,
500
501    /// G37: read the subset of memory names from a file (one per line).
502    /// Lines starting with `#` and empty lines are ignored. Combined with
503    /// `--names` (union) when both are set.
504    #[arg(long, value_name = "PATH")]
505    pub names_file: Option<PathBuf>,
506
507    /// G35: probe the LLM provider with a 1-turn ping before processing
508    /// the batch. Aborts with a clear error if the rate-limit window is
509    /// closed (avoids burning N turns only to fail on item 1).
510    #[arg(long, default_value_t = false)]
511    pub preflight_check: bool,
512
513    /// G35: if a preflight probe or in-flight call hits the Claude rate
514    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
515    /// of failing the batch. Ignored when `--mode` is already `codex`.
516    #[arg(long, value_enum)]
517    pub fallback_mode: Option<EnrichMode>,
518
519    /// G35: number of seconds before the OAuth rate-limit reset at which
520    /// the preflight probe should refuse to start. Default 300 (5 min).
521    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
522    pub rate_limit_buffer: u64,
523
524    /// G28-D: refuse to start when the 1-minute load average exceeds
525    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
526    /// Set to false to skip the check on contended CI runners.
527    #[arg(long, default_value_t = true)]
528    pub max_load_check: bool,
529
530    /// G28-D: when the system is saturated, abort the job after this
531    /// many consecutive HardFailure outcomes. Default 5.
532    #[arg(long, value_name = "N", default_value_t = 5)]
533    pub circuit_breaker_threshold: u32,
534
535    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
536    /// original body and the LLM-rewritten body for the rewrite to be
537    /// accepted. Scores below the threshold are rejected and emitted as
538    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
539    /// gap specification). Ignored when `--operation` is not
540    /// `body-enrich`.
541    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
542    pub preserve_threshold: f64,
543
544    /// G33 Passo 3: when set, validate `--codex-model` against the
545    /// ChatGPT Pro OAuth accepted-model list and abort with a
546    /// suggestion when the value is unknown. Default true (fail fast
547    /// to avoid burning OAuth turns). Set to false to opt out.
548    #[arg(long, default_value_t = true)]
549    pub codex_model_validate: bool,
550
551    /// G33 Passo 3: when set together with an invalid `--codex-model`,
552    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
553    /// instead of aborting. The substitution is recorded in the NDJSON
554    /// stream as `provider_substituted: true` for traceability.
555    #[arg(long, value_name = "MODEL")]
556    pub codex_model_fallback: Option<String>,
557}
558
559// ---------------------------------------------------------------------------
560// Internal types — raw LLM output structs
561// ---------------------------------------------------------------------------
562
563// ---------------------------------------------------------------------------
564// NDJSON event types emitted to stdout
565// ---------------------------------------------------------------------------
566
567#[derive(Debug, Serialize)]
568struct PhaseEvent<'a> {
569    phase: &'a str,
570    #[serde(skip_serializing_if = "Option::is_none")]
571    binary_path: Option<&'a str>,
572    #[serde(skip_serializing_if = "Option::is_none")]
573    version: Option<&'a str>,
574    #[serde(skip_serializing_if = "Option::is_none")]
575    items_total: Option<usize>,
576    #[serde(skip_serializing_if = "Option::is_none")]
577    items_pending: Option<usize>,
578    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
579    #[serde(skip_serializing_if = "Option::is_none")]
580    llm_parallelism: Option<u32>,
581}
582
583#[derive(Debug, Serialize)]
584struct ItemEvent<'a> {
585    /// Item identifier (memory name or entity name).
586    item: &'a str,
587    status: &'a str,
588    #[serde(skip_serializing_if = "Option::is_none")]
589    memory_id: Option<i64>,
590    #[serde(skip_serializing_if = "Option::is_none")]
591    entity_id: Option<i64>,
592    #[serde(skip_serializing_if = "Option::is_none")]
593    entities: Option<usize>,
594    #[serde(skip_serializing_if = "Option::is_none")]
595    rels: Option<usize>,
596    #[serde(skip_serializing_if = "Option::is_none")]
597    chars_before: Option<usize>,
598    #[serde(skip_serializing_if = "Option::is_none")]
599    chars_after: Option<usize>,
600    #[serde(skip_serializing_if = "Option::is_none")]
601    cost_usd: Option<f64>,
602    #[serde(skip_serializing_if = "Option::is_none")]
603    elapsed_ms: Option<u64>,
604    #[serde(skip_serializing_if = "Option::is_none")]
605    error: Option<String>,
606    index: usize,
607    total: usize,
608}
609
610#[derive(Debug, Serialize)]
611struct EnrichSummary {
612    summary: bool,
613    operation: String,
614    items_total: usize,
615    completed: usize,
616    failed: usize,
617    skipped: usize,
618    cost_usd: f64,
619    elapsed_ms: u64,
620    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
621    /// executou o re-embedding durante o enrich. `"claude" | "codex" | "none"`.
622    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
623    /// ou quando a operação não envolveu re-embed).
624    #[serde(skip_serializing_if = "Option::is_none")]
625    backend_invoked: Option<&'static str>,
626}
627
628use crate::output::emit_json_line as emit_json;
629
630// ---------------------------------------------------------------------------
631// Queue DB
632// ---------------------------------------------------------------------------
633
634/// Opens or creates the enrichment queue database.
635///
636/// The queue schema mirrors `ingest_claude` for resume/retry parity.
637/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
638///
639/// # DRY note
640///
641/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
642/// Both should be unified in a shared `llm_runner.rs` module by the
643/// Integration stream.
644fn open_queue_db(path: &str) -> Result<Connection, AppError> {
645    let conn = Connection::open(path)?;
646    conn.pragma_update(None, "journal_mode", "wal")?;
647    conn.execute_batch(
648        "CREATE TABLE IF NOT EXISTS queue (
649            id          INTEGER PRIMARY KEY AUTOINCREMENT,
650            item_key    TEXT NOT NULL UNIQUE,
651            item_type   TEXT NOT NULL DEFAULT 'memory',
652            status      TEXT NOT NULL DEFAULT 'pending',
653            memory_id   INTEGER,
654            entity_id   INTEGER,
655            entities    INTEGER DEFAULT 0,
656            rels        INTEGER DEFAULT 0,
657            error       TEXT,
658            cost_usd    REAL DEFAULT 0.0,
659            attempt     INTEGER DEFAULT 0,
660            elapsed_ms  INTEGER,
661            created_at  TEXT DEFAULT (datetime('now')),
662            done_at     TEXT
663        );
664        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
665    )?;
666    Ok(conn)
667}
668
669// ---------------------------------------------------------------------------
670// LLM invocation — Claude Code
671// ---------------------------------------------------------------------------
672
673/// Calls `claude -p` via the shared `claude_runner` module (G02).
674///
675/// Returns `(output_value, cost_usd, is_oauth)`.
676fn call_claude(
677    binary: &Path,
678    prompt: &str,
679    json_schema: &str,
680    input_text: &str,
681    model: Option<&str>,
682    timeout_secs: u64,
683) -> Result<(serde_json::Value, f64, bool), AppError> {
684    let result = crate::commands::claude_runner::run_claude(
685        binary,
686        prompt,
687        json_schema,
688        input_text,
689        model,
690        timeout_secs,
691        7,
692    )?;
693    Ok((result.value, result.cost_usd, result.is_oauth))
694}
695
696// ---------------------------------------------------------------------------
697// Preflight probe (G35) — single-turn ping to verify the LLM provider
698// ---------------------------------------------------------------------------
699
700/// Result of a single preflight ping (G35).
701enum PreflightOutcome {
702    /// The provider accepted the ping without rate-limit or other errors.
703    Healthy,
704    /// The provider rejected the ping due to OAuth rate limit. The
705    /// `suggestion` field is a human hint that callers can embed in the
706    /// user-facing error.
707    RateLimited {
708        reason: String,
709        suggestion: &'static str,
710    },
711    /// Any other provider error (binary missing, auth failure, etc.).
712    Error(AppError),
713}
714
715/// Probes the configured LLM provider with a 1-turn ping.
716///
717/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
718/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
719///
720/// The probe intentionally avoids spawning any MCP server children (G28-A)
721/// to keep its own process footprint at the minimum.
722fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
723    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
724
725    match args.mode {
726        EnrichMode::ClaudeCode => {
727            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
728                Ok(b) => b,
729                Err(e) => return PreflightOutcome::Error(e),
730            };
731            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
732            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
733            // form) and run the preflight gate before spawn, mirroring
734            // the canonical pattern in `claude_runner::build_claude_command`.
735            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
736                Ok(p) => p,
737                Err(e) => {
738                    return PreflightOutcome::Error(AppError::Io(e));
739                }
740            };
741            let mut cmd = std::process::Command::new(&bin);
742            cmd.env_clear();
743            for var in &["PATH", "HOME", "USER"] {
744                if let Ok(val) = std::env::var(var) {
745                    cmd.env(var, val);
746                }
747            }
748            cmd.arg("-p")
749                .arg("ping")
750                .arg("--max-turns")
751                .arg("1")
752                .arg("--strict-mcp-config")
753                .arg("--mcp-config")
754                .arg(mcp_config_path.as_os_str())
755                .arg("--dangerously-skip-permissions")
756                .arg("--settings")
757                .arg("{\"hooks\":{}}")
758                .arg("--output-format")
759                .arg("json")
760                .stdin(std::process::Stdio::null())
761                .stdout(std::process::Stdio::piped())
762                .stderr(std::process::Stdio::piped());
763
764            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
765                Ok(c) => c,
766                Err(e) => {
767                    return PreflightOutcome::Error(AppError::Io(e));
768                }
769            };
770            let output = match wait_with_timeout(child, timeout) {
771                Ok(out) => out,
772                Err(e) => return PreflightOutcome::Error(e),
773            };
774            if !output.status.success() {
775                let stderr = String::from_utf8_lossy(&output.stderr);
776                if stderr.contains("hit your session limit")
777                    || stderr.contains("rate_limit")
778                    || stderr.contains("429")
779                {
780                    return PreflightOutcome::RateLimited {
781                        reason: stderr.trim().to_string(),
782                        suggestion:
783                            "wait for the OAuth window to reset or use --fallback-mode codex",
784                    };
785                }
786                return PreflightOutcome::Error(AppError::Validation(format!(
787                    "preflight probe failed: {stderr}",
788                    stderr = stderr.trim()
789                )));
790            }
791            PreflightOutcome::Healthy
792        }
793        EnrichMode::Codex => {
794            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
795                Ok(b) => b,
796                Err(e) => return PreflightOutcome::Error(e),
797            };
798            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
799                .map_err(PreflightOutcome::Error)
800                .ok();
801            let schema = "{}";
802            let schema_path = match super::codex_spawn::trusted_schema_path() {
803                Ok(p) => p,
804                Err(e) => return PreflightOutcome::Error(e),
805            };
806            let spawn_args = super::codex_spawn::CodexSpawnArgs {
807                binary: &bin,
808                prompt: "ping",
809                json_schema: schema,
810                input_text: "",
811                model: args.codex_model.as_deref(),
812                timeout_secs: args.rate_limit_buffer.max(60),
813                schema_path: schema_path.clone(),
814            };
815            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
816                Ok(c) => c,
817                Err(e) => return PreflightOutcome::Error(e),
818            };
819            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
820                Ok(c) => c,
821                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
822            };
823            let output = match wait_with_timeout(child, timeout) {
824                Ok(out) => out,
825                Err(e) => return PreflightOutcome::Error(e),
826            };
827            let _ = std::fs::remove_file(&schema_path);
828            if !output.status.success() {
829                let stderr = String::from_utf8_lossy(&output.stderr);
830                if stderr.contains("rate_limit")
831                    || stderr.contains("429")
832                    || stderr.contains("Too Many Requests")
833                {
834                    return PreflightOutcome::RateLimited {
835                        reason: stderr.trim().to_string(),
836                        suggestion: "wait for the rate-limit window to reset",
837                    };
838                }
839                return PreflightOutcome::Error(AppError::Validation(format!(
840                    "preflight probe failed: {stderr}",
841                    stderr = stderr.trim()
842                )));
843            }
844            PreflightOutcome::Healthy
845        }
846        EnrichMode::Opencode => {
847            let bin = match super::opencode_runner::find_opencode_binary_with_override(
848                args.opencode_binary.as_deref(),
849            ) {
850                Ok(b) => b,
851                Err(e) => return PreflightOutcome::Error(e),
852            };
853            let model =
854                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
855            let mut cmd =
856                super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "");
857            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
858                Ok(c) => c,
859                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
860            };
861            let output = match wait_with_timeout(child, timeout) {
862                Ok(out) => out,
863                Err(e) => return PreflightOutcome::Error(e),
864            };
865            if !output.status.success() {
866                let stderr = String::from_utf8_lossy(&output.stderr);
867                if stderr.contains("rate_limit")
868                    || stderr.contains("429")
869                    || stderr.contains("Too Many Requests")
870                {
871                    return PreflightOutcome::RateLimited {
872                        reason: stderr.trim().to_string(),
873                        suggestion: "wait for the rate-limit window to reset",
874                    };
875                }
876                return PreflightOutcome::Error(AppError::Validation(format!(
877                    "preflight probe failed: {stderr}",
878                    stderr = stderr.trim()
879                )));
880            }
881            PreflightOutcome::Healthy
882        }
883    }
884}
885
886/// Cross-platform wait with timeout (no extra crate dependency).
887fn wait_with_timeout(
888    mut child: std::process::Child,
889    timeout: std::time::Duration,
890) -> Result<std::process::Output, AppError> {
891    use wait_timeout::ChildExt;
892    let start = std::time::Instant::now();
893    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
894    if status.is_none() {
895        let _ = child.kill();
896        let _ = child.wait();
897        return Err(AppError::Validation(format!(
898            "preflight probe timed out after {}s",
899            start.elapsed().as_secs()
900        )));
901    }
902    let mut stdout = Vec::new();
903    if let Some(mut out) = child.stdout.take() {
904        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
905    }
906    let mut stderr = Vec::new();
907    if let Some(mut err) = child.stderr.take() {
908        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
909    }
910    let exit = status.unwrap();
911    Ok(std::process::Output {
912        status: exit,
913        stdout,
914        stderr,
915    })
916}
917
918// ---------------------------------------------------------------------------
919// SCAN helpers — SQL queries that find items needing enrichment
920// ---------------------------------------------------------------------------
921
922/// Returns memories without any `memory_entities` binding.
923///
924/// These are the targets for `memory-bindings` enrichment. When `name_filter`
925/// is non-empty, restricts the scan to the given names (G37); unknown names
926/// are silently skipped (the caller can detect them by comparing
927/// requested vs. returned).
928fn scan_unbound_memories(
929    conn: &Connection,
930    namespace: &str,
931    limit: Option<usize>,
932    name_filter: &[String],
933) -> Result<Vec<(i64, String, String)>, AppError> {
934    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
935
936    if name_filter.is_empty() {
937        let sql = format!(
938            "SELECT m.id, m.name, m.body
939             FROM memories m
940             WHERE m.namespace = ?1
941               AND m.deleted_at IS NULL
942               AND NOT EXISTS (
943                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
944               )
945             ORDER BY m.id
946             {limit_clause}"
947        );
948        let mut stmt = conn.prepare(&sql)?;
949        let rows = stmt
950            .query_map(rusqlite::params![namespace], |r| {
951                Ok((
952                    r.get::<_, i64>(0)?,
953                    r.get::<_, String>(1)?,
954                    r.get::<_, String>(2)?,
955                ))
956            })?
957            .collect::<Result<Vec<_>, _>>()?;
958        Ok(rows)
959    } else {
960        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
961        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
962            .map(|i| format!("?{i}"))
963            .collect();
964        let in_clause = placeholders.join(", ");
965        let sql = format!(
966            "SELECT m.id, m.name, m.body
967             FROM memories m
968             WHERE m.namespace = ?1
969               AND m.deleted_at IS NULL
970               AND m.name IN ({in_clause})
971               AND NOT EXISTS (
972                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
973               )
974             ORDER BY m.id
975             {limit_clause}"
976        );
977        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
978        params_vec.push(&namespace);
979        for n in name_filter {
980            params_vec.push(n);
981        }
982        let mut stmt = conn.prepare(&sql)?;
983        let rows = stmt
984            .query_map(
985                rusqlite::params_from_iter(params_vec.iter().copied()),
986                |r| {
987                    Ok((
988                        r.get::<_, i64>(0)?,
989                        r.get::<_, String>(1)?,
990                        r.get::<_, String>(2)?,
991                    ))
992                },
993            )?
994            .collect::<Result<Vec<_>, _>>()?;
995        Ok(rows)
996    }
997}
998
999/// Reads a list of memory names from a UTF-8 text file (G37).
1000///
1001/// Empty lines and lines beginning with `#` are skipped. Returns a
1002/// de-duplicated, order-preserving list of trimmed names.
1003fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1004    let content = std::fs::read_to_string(path).map_err(|e| {
1005        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1006    })?;
1007    let mut seen = std::collections::HashSet::new();
1008    let mut out = Vec::new();
1009    for line in content.lines() {
1010        let trimmed = line.trim();
1011        if trimmed.is_empty() || trimmed.starts_with('#') {
1012            continue;
1013        }
1014        if seen.insert(trimmed.to_string()) {
1015            out.push(trimmed.to_string());
1016        }
1017    }
1018    Ok(out)
1019}
1020
1021/// Resolves the union of `--names` and `--names-file` (G37).
1022fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1023    let mut combined: Vec<String> = args.names.clone();
1024    if let Some(p) = &args.names_file {
1025        let from_file = read_names_file(p)?;
1026        for n in from_file {
1027            if !combined.contains(&n) {
1028                combined.push(n);
1029            }
1030        }
1031    }
1032    Ok(combined)
1033}
1034
1035/// Returns entities with NULL or empty description.
1036///
1037/// These are the targets for `entity-descriptions` enrichment.
1038fn scan_entities_without_description(
1039    conn: &Connection,
1040    namespace: &str,
1041    limit: Option<usize>,
1042    name_filter: &[String],
1043) -> Result<Vec<(i64, String, String)>, AppError> {
1044    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1045
1046    if name_filter.is_empty() {
1047        let sql = format!(
1048            "SELECT id, name, type
1049             FROM entities
1050             WHERE namespace = ?1
1051               AND (description IS NULL OR description = '')
1052             ORDER BY id
1053             {limit_clause}"
1054        );
1055        let mut stmt = conn.prepare(&sql)?;
1056        let rows = stmt
1057            .query_map(rusqlite::params![namespace], |r| {
1058                Ok((
1059                    r.get::<_, i64>(0)?,
1060                    r.get::<_, String>(1)?,
1061                    r.get::<_, String>(2)?,
1062                ))
1063            })?
1064            .collect::<Result<Vec<_>, _>>()?;
1065        Ok(rows)
1066    } else {
1067        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1068            .map(|i| format!("?{i}"))
1069            .collect();
1070        let in_clause = placeholders.join(", ");
1071        let sql = format!(
1072            "SELECT id, name, type
1073             FROM entities
1074             WHERE namespace = ?1
1075               AND name IN ({in_clause})
1076               AND (description IS NULL OR description = '')
1077             ORDER BY id
1078             {limit_clause}"
1079        );
1080        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1081        params_vec.push(&namespace);
1082        for n in name_filter {
1083            params_vec.push(n);
1084        }
1085        let mut stmt = conn.prepare(&sql)?;
1086        let rows = stmt
1087            .query_map(
1088                rusqlite::params_from_iter(params_vec.iter().copied()),
1089                |r| {
1090                    Ok((
1091                        r.get::<_, i64>(0)?,
1092                        r.get::<_, String>(1)?,
1093                        r.get::<_, String>(2)?,
1094                    ))
1095                },
1096            )?
1097            .collect::<Result<Vec<_>, _>>()?;
1098        Ok(rows)
1099    }
1100}
1101
1102/// Returns memories whose body length is below the configured minimum.
1103///
1104/// These are the targets for `body-enrich` (GAP-18).
1105fn scan_short_body_memories(
1106    conn: &Connection,
1107    namespace: &str,
1108    min_chars: usize,
1109    limit: Option<usize>,
1110    name_filter: &[String],
1111) -> Result<Vec<(i64, String, String)>, AppError> {
1112    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1113
1114    if name_filter.is_empty() {
1115        let sql = format!(
1116            "SELECT m.id, m.name, m.body
1117             FROM memories m
1118             WHERE m.namespace = ?1
1119               AND m.deleted_at IS NULL
1120               AND LENGTH(COALESCE(m.body,'')) < ?2
1121             ORDER BY m.id
1122             {limit_clause}"
1123        );
1124        let mut stmt = conn.prepare(&sql)?;
1125        let rows = stmt
1126            .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1127                Ok((
1128                    r.get::<_, i64>(0)?,
1129                    r.get::<_, String>(1)?,
1130                    r.get::<_, String>(2)?,
1131                ))
1132            })?
1133            .collect::<Result<Vec<_>, _>>()?;
1134        Ok(rows)
1135    } else {
1136        let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1137            .map(|i| format!("?{i}"))
1138            .collect();
1139        let in_clause = placeholders.join(", ");
1140        let sql = format!(
1141            "SELECT m.id, m.name, m.body
1142             FROM memories m
1143             WHERE m.namespace = ?1
1144               AND m.deleted_at IS NULL
1145               AND m.name IN ({in_clause})
1146               AND LENGTH(COALESCE(m.body,'')) < ?2
1147             ORDER BY m.id
1148             {limit_clause}"
1149        );
1150        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1151        let min_chars_i64 = min_chars as i64;
1152        params_vec.push(&namespace);
1153        params_vec.push(&min_chars_i64);
1154        for n in name_filter {
1155            params_vec.push(n);
1156        }
1157        let mut stmt = conn.prepare(&sql)?;
1158        let rows = stmt
1159            .query_map(
1160                rusqlite::params_from_iter(params_vec.iter().copied()),
1161                |r| {
1162                    Ok((
1163                        r.get::<_, i64>(0)?,
1164                        r.get::<_, String>(1)?,
1165                        r.get::<_, String>(2)?,
1166                    ))
1167                },
1168            )?
1169            .collect::<Result<Vec<_>, _>>()?;
1170        Ok(rows)
1171    }
1172}
1173
1174/// Returns live memories that still have no row in `memory_embeddings`.
1175///
1176/// These are the targets for `re-embed`.
1177fn scan_memories_without_embeddings(
1178    conn: &Connection,
1179    namespace: &str,
1180    limit: Option<usize>,
1181    name_filter: &[String],
1182) -> Result<Vec<(i64, String, String)>, AppError> {
1183    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1184
1185    if name_filter.is_empty() {
1186        let sql = format!(
1187            "SELECT m.id, m.name, COALESCE(m.body,'')
1188             FROM memories m
1189             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1190             WHERE m.namespace = ?1
1191               AND m.deleted_at IS NULL
1192               AND me.memory_id IS NULL
1193             ORDER BY m.id
1194             {limit_clause}"
1195        );
1196        let mut stmt = conn.prepare(&sql)?;
1197        let rows = stmt
1198            .query_map(rusqlite::params![namespace], |r| {
1199                Ok((
1200                    r.get::<_, i64>(0)?,
1201                    r.get::<_, String>(1)?,
1202                    r.get::<_, String>(2)?,
1203                ))
1204            })?
1205            .collect::<Result<Vec<_>, _>>()?;
1206        Ok(rows)
1207    } else {
1208        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1209            .map(|i| format!("?{i}"))
1210            .collect();
1211        let in_clause = placeholders.join(", ");
1212        let sql = format!(
1213            "SELECT m.id, m.name, COALESCE(m.body,'')
1214             FROM memories m
1215             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1216             WHERE m.namespace = ?1
1217               AND m.deleted_at IS NULL
1218               AND m.name IN ({in_clause})
1219               AND me.memory_id IS NULL
1220             ORDER BY m.id
1221             {limit_clause}"
1222        );
1223        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1224        params_vec.push(&namespace);
1225        for n in name_filter {
1226            params_vec.push(n);
1227        }
1228        let mut stmt = conn.prepare(&sql)?;
1229        let rows = stmt
1230            .query_map(
1231                rusqlite::params_from_iter(params_vec.iter().copied()),
1232                |r| {
1233                    Ok((
1234                        r.get::<_, i64>(0)?,
1235                        r.get::<_, String>(1)?,
1236                        r.get::<_, String>(2)?,
1237                    ))
1238                },
1239            )?
1240            .collect::<Result<Vec<_>, _>>()?;
1241        Ok(rows)
1242    }
1243}
1244
1245/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1246#[allow(clippy::type_complexity)]
1247fn scan_weight_candidates(
1248    conn: &Connection,
1249    namespace: &str,
1250    limit: Option<usize>,
1251) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1252    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1253    let sql = format!(
1254        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1255         FROM relationships r \
1256         JOIN entities e1 ON e1.id = r.source_id \
1257         JOIN entities e2 ON e2.id = r.target_id \
1258         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1259         ORDER BY r.weight DESC {limit_clause}"
1260    );
1261    let mut stmt = conn.prepare(&sql)?;
1262    let rows = stmt
1263        .query_map(rusqlite::params![namespace], |r| {
1264            Ok((
1265                r.get::<_, i64>(0)?,
1266                r.get::<_, String>(1)?,
1267                r.get::<_, String>(2)?,
1268                r.get::<_, String>(3)?,
1269                r.get::<_, f64>(4)?,
1270            ))
1271        })?
1272        .collect::<Result<Vec<_>, _>>()?;
1273    Ok(rows)
1274}
1275
1276/// G27: Returns relationships with generic relation types (applies_to).
1277fn scan_generic_relations(
1278    conn: &Connection,
1279    namespace: &str,
1280    limit: Option<usize>,
1281) -> Result<Vec<(i64, String, String, String)>, AppError> {
1282    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1283    let sql = format!(
1284        "SELECT r.id, e1.name, e2.name, r.relation \
1285         FROM relationships r \
1286         JOIN entities e1 ON e1.id = r.source_id \
1287         JOIN entities e2 ON e2.id = r.target_id \
1288         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1289         ORDER BY r.id {limit_clause}"
1290    );
1291    let mut stmt = conn.prepare(&sql)?;
1292    let rows = stmt
1293        .query_map(rusqlite::params![namespace], |r| {
1294            Ok((
1295                r.get::<_, i64>(0)?,
1296                r.get::<_, String>(1)?,
1297                r.get::<_, String>(2)?,
1298                r.get::<_, String>(3)?,
1299            ))
1300        })?
1301        .collect::<Result<Vec<_>, _>>()?;
1302    Ok(rows)
1303}
1304
1305// ---------------------------------------------------------------------------
1306// PERSIST helpers for fully-implemented operations
1307// ---------------------------------------------------------------------------
1308
1309/// Persists entity bindings extracted by the LLM for a memory.
1310///
1311/// Creates entities via `upsert_entity`, links them to the memory via
1312/// `link_memory_entity`, and upserts relationships found between entities.
1313fn persist_memory_bindings(
1314    conn: &Connection,
1315    namespace: &str,
1316    memory_id: i64,
1317    entities_json: &serde_json::Value,
1318    rels_json: &serde_json::Value,
1319) -> Result<(usize, usize), AppError> {
1320    #[derive(Deserialize)]
1321    struct EntityItem {
1322        name: String,
1323        entity_type: String,
1324    }
1325    #[derive(Deserialize)]
1326    struct RelItem {
1327        source: String,
1328        target: String,
1329        relation: String,
1330        strength: f64,
1331    }
1332
1333    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1334        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1335
1336    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1337        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1338
1339    let mut ent_count = 0usize;
1340    let mut rel_count = 0usize;
1341
1342    for item in &extracted_entities {
1343        let entity_type = match item.entity_type.parse::<EntityType>() {
1344            Ok(et) => et,
1345            Err(_) => {
1346                tracing::warn!(
1347                    target: "enrich",
1348                    entity = %item.name,
1349                    entity_type = %item.entity_type,
1350                    "entity type not recognized, skipping"
1351                );
1352                continue;
1353            }
1354        };
1355        match entities::upsert_entity(
1356            conn,
1357            namespace,
1358            &NewEntity {
1359                name: item.name.clone(),
1360                entity_type,
1361                description: None,
1362            },
1363        ) {
1364            Ok(eid) => {
1365                let _ = entities::link_memory_entity(conn, memory_id, eid);
1366                ent_count += 1;
1367            }
1368            Err(e) => {
1369                tracing::warn!(
1370                    target: "enrich",
1371                    entity = %item.name,
1372                    error = %e,
1373                    "entity upsert skipped"
1374                );
1375            }
1376        }
1377    }
1378
1379    for rel in &extracted_rels {
1380        let normalized = crate::parsers::normalize_relation(&rel.relation);
1381        crate::parsers::warn_if_non_canonical(&normalized);
1382
1383        // Normalize entity names before lookup: upsert_entity normalizes on write,
1384        // so the lookup must use the same normalized form to find the row.
1385        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1386        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1387        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1388        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1389        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1390            let new_rel = NewRelationship {
1391                source: rel.source.clone(),
1392                target: rel.target.clone(),
1393                relation: normalized,
1394                strength: rel.strength,
1395                description: None,
1396            };
1397            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1398                rel_count += 1;
1399            }
1400        }
1401    }
1402
1403    Ok((ent_count, rel_count))
1404}
1405
1406/// Updates an entity's description directly in the `entities` table.
1407fn persist_entity_description(
1408    conn: &Connection,
1409    entity_id: i64,
1410    description: &str,
1411) -> Result<(), AppError> {
1412    conn.execute(
1413        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1414        rusqlite::params![description, entity_id],
1415    )?;
1416    Ok(())
1417}
1418
1419/// v1.0.84 (ADR-0042): on successful re-embed, records the active backend
1420/// into the shared accumulator (`ENRICH_LAST_BACKEND`) so the final
1421/// `EnrichSummary` can expose `backend_invoked` without changing every
1422/// caller's signature. Best-effort observability — concurrent enrich runs
1423/// may race, but `Mutex` keeps the mutation safe.
1424#[allow(clippy::too_many_arguments)]
1425fn reembed_memory_vector(
1426    conn: &Connection,
1427    namespace: &str,
1428    memory_id: i64,
1429    memory_name: &str,
1430    memory_type: &str,
1431    body: &str,
1432    paths: &crate::paths::AppPaths,
1433    llm_backend: crate::cli::LlmBackendChoice,
1434) -> Result<(), AppError> {
1435    let snippet: String = body.chars().take(200).collect();
1436    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
1437    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
1438    // backend que efetivamente rodou e popula o accumulator para o
1439    // EnrichSummary agregado.
1440    let (embedding, backend_kind) =
1441        crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1442    record_enrich_backend(backend_kind.as_str());
1443    memories::upsert_vec(
1444        conn,
1445        memory_id,
1446        namespace,
1447        memory_type,
1448        &embedding,
1449        memory_name,
1450        &snippet,
1451    )?;
1452    Ok(())
1453}
1454
1455/// v1.0.84 (ADR-0042): process-local accumulator of the last LLM backend
1456/// that successfully ran a re-embed during the current enrich invocation.
1457/// Read by `run` once at summary emission. Scoped to a single process —
1458/// cross-process enrichment is gated by the per-namespace singleton, so
1459/// there is no concurrency hazard across DBs.
1460fn record_enrich_backend(backend: &'static str) {
1461    if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1462        *guard = Some(backend);
1463    }
1464}
1465
1466fn take_enrich_backend() -> Option<&'static str> {
1467    ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1468}
1469
1470static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1471
1472/// Persists an enriched memory body (body-enrich, GAP-18).
1473///
1474/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1475/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1476fn persist_enriched_body(
1477    conn: &Connection,
1478    namespace: &str,
1479    memory_id: i64,
1480    memory_name: &str,
1481    new_body: &str,
1482    paths: &crate::paths::AppPaths,
1483    llm_backend: crate::cli::LlmBackendChoice,
1484) -> Result<(), AppError> {
1485    // Read current values for FTS sync
1486    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1487        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1488        rusqlite::params![memory_id],
1489        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1490    )?;
1491
1492    let memory_type: String = conn.query_row(
1493        "SELECT type FROM memories WHERE id=?1",
1494        rusqlite::params![memory_id],
1495        |r| r.get(0),
1496    )?;
1497
1498    let description: String = conn.query_row(
1499        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1500        rusqlite::params![memory_id],
1501        |r| r.get(0),
1502    )?;
1503
1504    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1505
1506    let new_memory = memories::NewMemory {
1507        namespace: namespace.to_string(),
1508        name: memory_name.to_string(),
1509        memory_type: memory_type.clone(),
1510        description: description.clone(),
1511        body: new_body.to_string(),
1512        body_hash,
1513        session_id: None,
1514        source: "agent".to_string(),
1515        metadata: serde_json::json!({
1516            "operation": "body-enrich",
1517            "orig_chars": old_body.chars().count(),
1518            "new_chars": new_body.chars().count(),
1519        }),
1520    };
1521
1522    // G29 audit: insert a new immutable version BEFORE the update so the
1523    // enriched body is reachable through `history --name <X>` and
1524    // `restore --version N` can roll back to the pre-enrich state.
1525    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1526    let version_metadata = serde_json::json!({
1527        "operation": "body-enrich",
1528        "orig_chars": old_body.chars().count(),
1529        "new_chars": new_body.chars().count(),
1530    })
1531    .to_string();
1532    crate::storage::versions::insert_version(
1533        conn,
1534        memory_id,
1535        next_version,
1536        memory_name,
1537        &memory_type,
1538        &description,
1539        new_body,
1540        &version_metadata,
1541        Some("enrich"),
1542        "edit",
1543    )?;
1544
1545    memories::update(conn, memory_id, &new_memory, None)?;
1546    memories::sync_fts_after_update(
1547        conn,
1548        memory_id,
1549        &old_name,
1550        &old_desc,
1551        &old_body,
1552        &new_memory.name,
1553        &new_memory.description,
1554        &new_memory.body,
1555    )?;
1556
1557    // Re-embed for recall accuracy
1558    if let Err(e) = reembed_memory_vector(
1559        conn,
1560        namespace,
1561        memory_id,
1562        memory_name,
1563        &memory_type,
1564        new_body,
1565        paths,
1566        llm_backend,
1567    ) {
1568        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1569    }
1570
1571    Ok(())
1572}
1573
1574// ---------------------------------------------------------------------------
1575// Main entry point
1576// ---------------------------------------------------------------------------
1577
1578// ---------------------------------------------------------------------------
1579// G20: mode-conditional flag validation
1580// ---------------------------------------------------------------------------
1581
1582/// True when a scalar value matches its declared default. Used to
1583/// distinguish "operator passed an explicit override" from "clap filled
1584/// the default" for flags with default_value_t.
1585fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1586    value == default
1587}
1588
1589/// G20: validate that flags for one LLM provider were not passed when
1590/// the operator selected a different provider. Flags silently discarded
1591/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1592/// DB work, so the operator gets an actionable error instead of a
1593/// surprise at runtime.
1594///
1595/// Detection rules:
1596/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1597/// - For scalar fields with default_value_t: value != default means explicit
1598/// - For boolean fields: true means explicit (default is false)
1599///
1600/// Mode-specific matrices:
1601/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1602/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1603fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1604    const DEFAULT_TIMEOUT: u64 = 300;
1605
1606    let mut conflicts: Vec<String> = Vec::new();
1607
1608    match args.mode {
1609        EnrichMode::ClaudeCode => {
1610            if args.codex_binary.is_some() {
1611                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1612            }
1613            if args.codex_model.is_some() {
1614                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1615            }
1616            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1617                conflicts.push(format!(
1618                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1619                    args.codex_timeout
1620                ));
1621            }
1622        }
1623        EnrichMode::Codex => {
1624            if args.claude_binary.is_some() {
1625                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1626            }
1627            if args.claude_model.is_some() {
1628                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1629            }
1630            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1631                conflicts.push(format!(
1632                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1633                    args.claude_timeout
1634                ));
1635            }
1636            if args.max_cost_usd.is_some() {
1637                conflicts.push(
1638                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1639                        .to_string(),
1640                );
1641            }
1642        }
1643        EnrichMode::Opencode => {
1644            if args.claude_binary.is_some() {
1645                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1646            }
1647            if args.claude_model.is_some() {
1648                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1649            }
1650            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1651                conflicts.push(format!(
1652                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1653                    args.claude_timeout
1654                ));
1655            }
1656            if args.max_cost_usd.is_some() {
1657                conflicts.push(
1658                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1659                        .to_string(),
1660                );
1661            }
1662        }
1663    }
1664
1665    if !conflicts.is_empty() {
1666        return Err(AppError::Validation(format!(
1667            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1668            args.mode,
1669            conflicts.join("\n  - ")
1670        )));
1671    }
1672
1673    Ok(())
1674}
1675
1676// ---------------------------------------------------------------------------
1677
1678/// Main entry point for the `enrich` command.
1679pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1680    // G20: mode-conditional flag validation BEFORE any DB access.
1681    // Surfaces flags that the wrong mode would silently discard.
1682    validate_mode_conditional_flags_enrich(args)?;
1683    let started = Instant::now();
1684
1685    let paths = AppPaths::resolve(args.db.as_deref())?;
1686    ensure_db_ready(&paths)?;
1687    let conn = open_rw(&paths.db)?;
1688    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1689
1690    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1691    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1692    // on the same DB cannot co-exist, but concurrent enrich on different
1693    // databases works as expected. The force flag (--force) breaks a
1694    // stale lock from a previously crashed invocation.
1695    let wait_secs = args.wait_job_singleton;
1696    let force_flag = args.force_job_singleton;
1697    let _singleton = crate::lock::acquire_job_singleton(
1698        crate::lock::JobType::Enrich,
1699        &namespace,
1700        &paths.db,
1701        wait_secs,
1702        force_flag,
1703    )?;
1704
1705    // Validate provider binary upfront only for LLM-backed operations.
1706    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1707        None
1708    } else {
1709        Some(match args.mode {
1710            EnrichMode::ClaudeCode => {
1711                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1712                let version = super::claude_runner::validate_claude_version(&bin)?;
1713                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1714                emit_json(&PhaseEvent {
1715                    phase: "validate",
1716                    binary_path: bin.to_str(),
1717                    version: Some(&version),
1718                    items_total: None,
1719                    items_pending: None,
1720                    llm_parallelism: None,
1721                });
1722                bin
1723            }
1724            EnrichMode::Codex => {
1725                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1726                emit_json(&PhaseEvent {
1727                    phase: "validate",
1728                    binary_path: bin.to_str(),
1729                    version: None,
1730                    items_total: None,
1731                    items_pending: None,
1732                    llm_parallelism: None,
1733                });
1734                bin
1735            }
1736            EnrichMode::Opencode => {
1737                let bin = super::opencode_runner::find_opencode_binary_with_override(
1738                    args.opencode_binary.as_deref(),
1739                )?;
1740                emit_json(&PhaseEvent {
1741                    phase: "validate",
1742                    binary_path: bin.to_str(),
1743                    version: None,
1744                    items_total: None,
1745                    items_pending: None,
1746                    llm_parallelism: None,
1747                });
1748                bin
1749            }
1750        })
1751    };
1752
1753    // G28-D: refuse to start when the system is saturated. This check
1754    // is BEFORE preflight so we never spend an OAuth turn on a host
1755    // that is already at the limit.
1756    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1757        let load = crate::system_load::load_average_one();
1758        let n = crate::system_load::ncpus();
1759        return Err(AppError::Validation(format!(
1760            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1761             pass --no-max-load-check to override (not recommended)"
1762        )));
1763    }
1764
1765    // G35: preflight probe — issue a single ping turn to verify the
1766    // provider is healthy before scanning N candidates. If the probe
1767    // fails with a rate-limit error, optionally fall back to a
1768    // different mode (typically codex) instead of failing the entire
1769    // batch. The probe itself consumes 1 OAuth turn, so it stays
1770    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1771    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1772    {
1773        let preflight_result = run_preflight_probe(args);
1774        match preflight_result {
1775            PreflightOutcome::Healthy => {
1776                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1777            }
1778            PreflightOutcome::RateLimited { reason, suggestion } => {
1779                if let Some(fallback) = args.fallback_mode.clone() {
1780                    if fallback != args.mode {
1781                        // G35 (v1.0.69): the mid-batch mode switch is
1782                        // intentionally NOT applied because it would
1783                        // desynchronise the per-item rate-limit wait
1784                        // state (rate-limited items in the worker are
1785                        // timed against the original provider). Instead
1786                        // we abort cleanly so the operator can re-invoke
1787                        // with `--mode {fallback:?}`. This guarantees no
1788                        // OAuth window is wasted and no partial state
1789                        // is left in the queue.
1790                        return Err(AppError::Validation(format!(
1791                            "preflight detected rate limit on {mode:?}: {reason}; \
1792                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1793                            mode = args.mode
1794                        )));
1795                    }
1796                    return Err(AppError::Validation(format!(
1797                        "preflight detected rate limit on {mode:?}: {reason}; \
1798                         --fallback-mode matches --mode, no recovery possible",
1799                        mode = args.mode
1800                    )));
1801                }
1802                return Err(AppError::Validation(format!(
1803                    "preflight detected rate limit on {mode:?}: {reason}; \
1804                     {suggestion}; pass --fallback-mode codex to recover",
1805                    mode = args.mode
1806                )));
1807            }
1808            PreflightOutcome::Error(e) => {
1809                return Err(e);
1810            }
1811        }
1812    }
1813
1814    // SCAN phase
1815    let scan_result = scan_operation(&conn, &namespace, args)?;
1816    let total = scan_result.len();
1817
1818    emit_json(&PhaseEvent {
1819        phase: "scan",
1820        binary_path: None,
1821        version: None,
1822        items_total: Some(total),
1823        items_pending: Some(total),
1824        llm_parallelism: Some(args.llm_parallelism),
1825    });
1826
1827    // Dry-run: emit preview events and summary without calling LLM
1828    if args.dry_run {
1829        for (idx, key) in scan_result.iter().enumerate() {
1830            emit_json(&ItemEvent {
1831                item: key,
1832                status: "preview",
1833                memory_id: None,
1834                entity_id: None,
1835                entities: None,
1836                rels: None,
1837                chars_before: None,
1838                chars_after: None,
1839                cost_usd: None,
1840                elapsed_ms: None,
1841                error: None,
1842                index: idx,
1843                total,
1844            });
1845        }
1846        emit_json(&EnrichSummary {
1847            summary: true,
1848            operation: format!("{:?}", args.operation),
1849            items_total: total,
1850            completed: 0,
1851            failed: 0,
1852            skipped: 0,
1853            cost_usd: 0.0,
1854            elapsed_ms: started.elapsed().as_millis() as u64,
1855            backend_invoked: take_enrich_backend(),
1856        });
1857        return Ok(());
1858    }
1859
1860    // All operations in this enum have an execution path.
1861
1862    // Queue setup for resume/retry
1863    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1864
1865    if args.resume {
1866        let reset = queue_conn
1867            .execute(
1868                "UPDATE queue SET status='pending' WHERE status='processing'",
1869                [],
1870            )
1871            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1872        if reset > 0 {
1873            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1874        }
1875    }
1876
1877    if args.retry_failed {
1878        let count = queue_conn
1879            .execute(
1880                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1881                [],
1882            )
1883            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1884        tracing::info!(target: "enrich", count, "retrying failed items");
1885    }
1886
1887    if !args.resume && !args.retry_failed {
1888        queue_conn
1889            .execute("DELETE FROM queue", [])
1890            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1891    }
1892
1893    // Populate queue
1894    for (idx, key) in scan_result.iter().enumerate() {
1895        let item_type = match args.operation {
1896            EnrichOperation::EntityDescriptions => "entity",
1897            _ => "memory",
1898        };
1899        if let Err(e) = queue_conn.execute(
1900            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1901            rusqlite::params![key, item_type],
1902        ) {
1903            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1904        }
1905        let _ = idx; // suppress unused warning
1906    }
1907
1908    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1909    // Clamp enforces the range even if the caller bypasses clap validation.
1910    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1911    if parallelism > 1 {
1912        tracing::info!(
1913            target: "enrich",
1914            llm_parallelism = parallelism,
1915            "parallel LLM processing with bounded thread pool"
1916        );
1917    }
1918    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1919    // ceiling. The threshold and message depend on the LLM mode because
1920    // Claude Code spawns MCP children (G28-A) while Codex does not.
1921    if parallelism > 4 {
1922        match args.mode {
1923            EnrichMode::ClaudeCode => {
1924                tracing::warn!(
1925                    target: "enrich",
1926                    llm_parallelism = parallelism,
1927                    recommended_max = 4,
1928                    mode = "claude-code",
1929                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1930                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1931                     to cut MCP children (G28-A)"
1932                );
1933            }
1934            EnrichMode::Codex if parallelism > 16 => {
1935                tracing::warn!(
1936                    target: "enrich",
1937                    llm_parallelism = parallelism,
1938                    recommended_max = 16,
1939                    mode = "codex",
1940                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1941                     consider --llm-parallelism 8 for safer concurrency"
1942                );
1943            }
1944            EnrichMode::Codex => {
1945                // No warning: codex does not spawn MCP children and was
1946                // validated at parallelism 8 in production (1161 items,
1947                // 0 failures) per the 2026-06-04 session audit.
1948            }
1949            EnrichMode::Opencode if parallelism > 16 => {
1950                tracing::warn!(
1951                    target: "enrich",
1952                    llm_parallelism = parallelism,
1953                    recommended_max = 16,
1954                    mode = "opencode",
1955                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1956                     consider --llm-parallelism 8 for safer concurrency"
1957                );
1958            }
1959            EnrichMode::Opencode => {
1960                // No warning: opencode does not spawn MCP children.
1961            }
1962        }
1963    }
1964
1965    let mut completed = 0usize;
1966    let mut failed = 0usize;
1967    let mut skipped = 0usize;
1968    let mut cost_total = 0.0f64;
1969    let mut oauth_detected = false;
1970    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1971    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1972    let enrich_started = std::time::Instant::now();
1973
1974    let provider_timeout = match args.mode {
1975        EnrichMode::ClaudeCode => args.claude_timeout,
1976        EnrichMode::Codex => args.codex_timeout,
1977        EnrichMode::Opencode => args.opencode_timeout,
1978    };
1979
1980    let provider_model: Option<&str> = match args.mode {
1981        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1982        EnrichMode::Codex => args.codex_model.as_deref(),
1983        EnrichMode::Opencode => args.opencode_model.as_deref(),
1984    };
1985
1986    // G19: when parallelism > 1, spawn bounded worker threads.
1987    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1988    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1989    if parallelism > 1 {
1990        let stdout_mu = parking_lot::Mutex::new(());
1991        let budget = args.max_cost_usd;
1992        let operation = args.operation.clone();
1993        let mode = args.mode.clone();
1994        let min_oc = args.min_output_chars;
1995        let max_oc = args.max_output_chars;
1996        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1997
1998        struct WorkerResult {
1999            completed: usize,
2000            failed: usize,
2001            skipped: usize,
2002            cost: f64,
2003            oauth: bool,
2004        }
2005
2006        let results: Vec<WorkerResult> = std::thread::scope(|s| {
2007            let handles: Vec<_> = (0..parallelism)
2008                .map(|worker_id| {
2009                    let stdout_mu = &stdout_mu;
2010                    let paths = &paths;
2011                    let namespace = &namespace;
2012                    let provider_binary = provider_binary.as_deref();
2013                    let operation = &operation;
2014                    let mode = &mode;
2015                    let prompt_tpl = prompt_tpl.as_deref();
2016                    s.spawn(move || {
2017                        let w_conn = match open_rw(&paths.db) {
2018                            Ok(c) => c,
2019                            Err(e) => {
2020                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2021                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2022                            }
2023                        };
2024                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2025                            Ok(c) => c,
2026                            Err(e) => {
2027                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2028                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2029                            }
2030                        };
2031                        let mut w_completed = 0usize;
2032                        let mut w_failed = 0usize;
2033                        let mut w_skipped = 0usize;
2034                        let mut w_cost = 0.0f64;
2035                        let mut w_oauth = false;
2036                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2037                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2038                        // G28-D: per-worker circuit breaker that aborts the
2039                        // loop after `circuit_breaker_threshold` consecutive
2040                        // HardFailure outcomes (transient/rate-limited errors
2041                        // do NOT count, so a recovering provider is not
2042                        // penalised).
2043                        let mut w_breaker = crate::retry::CircuitBreaker::new(
2044                            args.circuit_breaker_threshold.max(1),
2045                            std::time::Duration::from_secs(60),
2046                        );
2047
2048                        loop {
2049                            if crate::shutdown_requested() {
2050                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2051                                break;
2052                            }
2053                            if let Some(b) = budget {
2054                                if !w_oauth && w_cost >= b {
2055                                    break;
2056                                }
2057                            }
2058                            let pending: Option<(i64, String, String)> = w_queue
2059                                .query_row(
2060                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2061                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2062                                     RETURNING id, item_key, item_type",
2063                                    [],
2064                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2065                                )
2066                                .ok();
2067                            let (queue_id, item_key, _item_type) = match pending {
2068                                Some(p) => p,
2069                                None => break,
2070                            };
2071                            let item_started = Instant::now();
2072                            let current_index = w_completed + w_failed + w_skipped;
2073
2074                            let call_result = match operation {
2075                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2076                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2077                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
2078                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
2079                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2080                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2081                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2082                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2083                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2084                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2085                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2086                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2087                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2088                            };
2089
2090                            match call_result {
2091                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2092                                    if is_oauth { w_oauth = true; }
2093                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2094                                    let _ = w_queue.execute(
2095                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2096                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2097                                    );
2098                                    w_completed += 1;
2099                                    if !is_oauth { w_cost += cost; }
2100                                    // G28-D: count success; resets breaker.
2101                                    let _ = w_breaker
2102                                        .record(crate::retry::AttemptOutcome::Success);
2103                                    let _guard = stdout_mu.lock();
2104                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2105                                }
2106                                Ok(EnrichItemResult::Skipped { reason }) => {
2107                                    w_skipped += 1;
2108                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2109                                    let _guard = stdout_mu.lock();
2110                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2111                                }
2112                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2113                                    // G29 Passo 4: worker mirror of the
2114                                    // serial path. Counted as a soft
2115                                    // skip so the queue surface shows
2116                                    // a quality issue rather than a
2117                                    // transport failure.
2118                                    w_skipped += 1;
2119                                    let reason = format!(
2120                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2121                                    );
2122                                    let _ = w_queue.execute(
2123                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2124                                        rusqlite::params![reason, queue_id],
2125                                    );
2126                                    let _guard = stdout_mu.lock();
2127                                    emit_json(&ItemEvent {
2128                                        item: &item_key,
2129                                        status: "preservation_failed",
2130                                        memory_id: None,
2131                                        entity_id: None,
2132                                        entities: None,
2133                                        rels: None,
2134                                        chars_before: Some(chars_before),
2135                                        chars_after: Some(chars_after),
2136                                        cost_usd: None,
2137                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2138                                        error: Some(reason),
2139                                        index: current_index,
2140                                        total,
2141                                    });
2142                                }
2143                                Err(e) => {
2144                                    let err_str = format!("{e}");
2145                                    if matches!(e, AppError::RateLimited { .. }) {
2146                                        if crate::retry::is_kill_switch_active() {
2147                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2148                                        } else if std::time::Instant::now() >= w_deadline {
2149                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2150                                        } else {
2151                                            let half = w_backoff / 2;
2152                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2153                                            let actual_wait = half + jitter;
2154                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2155                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2156                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2157                                            w_backoff = (w_backoff * 2).min(900);
2158                                            continue;
2159                                        }
2160                                    }
2161                                    w_failed += 1;
2162                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2163                                    let _guard = stdout_mu.lock();
2164                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2165                                    // G28-D: count hard failure against breaker.
2166                                    let breaker_opened = w_breaker
2167                                        .record(crate::retry::AttemptOutcome::HardFailure);
2168                                    if breaker_opened {
2169                                        tracing::error!(target: "enrich",
2170                                            consecutive_failures = w_breaker.consecutive_failures(),
2171                                            "circuit breaker opened — aborting worker"
2172                                        );
2173                                        break;
2174                                    }
2175                                }
2176                            }
2177                        }
2178                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2179                    })
2180                })
2181                .collect();
2182            handles
2183                .into_iter()
2184                .map(|h| {
2185                    h.join().unwrap_or(WorkerResult {
2186                        completed: 0,
2187                        failed: 0,
2188                        skipped: 0,
2189                        cost: 0.0,
2190                        oauth: false,
2191                    })
2192                })
2193                .collect()
2194        });
2195
2196        for r in &results {
2197            completed += r.completed;
2198            failed += r.failed;
2199            skipped += r.skipped;
2200            cost_total += r.cost;
2201            if r.oauth && !oauth_detected {
2202                oauth_detected = true;
2203            }
2204        }
2205    } else {
2206        // Serial path (parallelism == 1) — original loop
2207        loop {
2208            if crate::shutdown_requested() {
2209                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2210                break;
2211            }
2212
2213            // Budget check
2214            if let Some(budget) = args.max_cost_usd {
2215                if !oauth_detected && cost_total >= budget {
2216                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2217                    break;
2218                }
2219            }
2220
2221            // Dequeue next pending item
2222            let pending: Option<(i64, String, String)> = queue_conn
2223                .query_row(
2224                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2225                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2226                 RETURNING id, item_key, item_type",
2227                    [],
2228                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2229                )
2230                .ok();
2231
2232            let (queue_id, item_key, item_type) = match pending {
2233                Some(p) => p,
2234                None => break,
2235            };
2236
2237            let item_started = Instant::now();
2238            let current_index = completed + failed + skipped;
2239
2240            let call_result = match args.operation {
2241                EnrichOperation::MemoryBindings => call_memory_bindings(
2242                    &conn,
2243                    &namespace,
2244                    &item_key,
2245                    provider_binary
2246                        .as_deref()
2247                        .expect("provider binary required"),
2248                    provider_model,
2249                    provider_timeout,
2250                    &args.mode,
2251                ),
2252                EnrichOperation::EntityDescriptions => call_entity_description(
2253                    &conn,
2254                    &namespace,
2255                    &item_key,
2256                    provider_binary
2257                        .as_deref()
2258                        .expect("provider binary required"),
2259                    provider_model,
2260                    provider_timeout,
2261                    &args.mode,
2262                ),
2263                EnrichOperation::BodyEnrich => call_body_enrich(
2264                    &conn,
2265                    &namespace,
2266                    &item_key,
2267                    provider_binary
2268                        .as_deref()
2269                        .expect("provider binary required"),
2270                    provider_model,
2271                    provider_timeout,
2272                    &args.mode,
2273                    args.min_output_chars,
2274                    args.max_output_chars,
2275                    args.prompt_template.as_deref(),
2276                    args.preserve_threshold,
2277                    &paths,
2278                    llm_backend,
2279                ),
2280                EnrichOperation::ReEmbed => {
2281                    call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2282                }
2283                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2284                    &conn,
2285                    &namespace,
2286                    &item_key,
2287                    provider_binary
2288                        .as_deref()
2289                        .expect("provider binary required"),
2290                    provider_model,
2291                    provider_timeout,
2292                    &args.mode,
2293                ),
2294                EnrichOperation::RelationReclassify => call_relation_reclassify(
2295                    &conn,
2296                    &namespace,
2297                    &item_key,
2298                    provider_binary
2299                        .as_deref()
2300                        .expect("provider binary required"),
2301                    provider_model,
2302                    provider_timeout,
2303                    &args.mode,
2304                ),
2305                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2306                    call_entity_connect(
2307                        &conn,
2308                        &namespace,
2309                        &item_key,
2310                        provider_binary
2311                            .as_deref()
2312                            .expect("provider binary required"),
2313                        provider_model,
2314                        provider_timeout,
2315                        &args.mode,
2316                    )
2317                }
2318                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2319                    &conn,
2320                    &namespace,
2321                    &item_key,
2322                    provider_binary
2323                        .as_deref()
2324                        .expect("provider binary required"),
2325                    provider_model,
2326                    provider_timeout,
2327                    &args.mode,
2328                ),
2329                EnrichOperation::DescriptionEnrich => call_description_enrich(
2330                    &conn,
2331                    &namespace,
2332                    &item_key,
2333                    provider_binary
2334                        .as_deref()
2335                        .expect("provider binary required"),
2336                    provider_model,
2337                    provider_timeout,
2338                    &args.mode,
2339                ),
2340                EnrichOperation::DomainClassify => call_domain_classify(
2341                    &conn,
2342                    &namespace,
2343                    &item_key,
2344                    provider_binary
2345                        .as_deref()
2346                        .expect("provider binary required"),
2347                    provider_model,
2348                    provider_timeout,
2349                    &args.mode,
2350                ),
2351                EnrichOperation::GraphAudit => call_graph_audit(
2352                    &conn,
2353                    &namespace,
2354                    &item_key,
2355                    provider_binary
2356                        .as_deref()
2357                        .expect("provider binary required"),
2358                    provider_model,
2359                    provider_timeout,
2360                    &args.mode,
2361                ),
2362                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2363                    &conn,
2364                    &namespace,
2365                    &item_key,
2366                    provider_binary
2367                        .as_deref()
2368                        .expect("provider binary required"),
2369                    provider_model,
2370                    provider_timeout,
2371                    &args.mode,
2372                ),
2373                EnrichOperation::BodyExtract => call_body_extract(
2374                    &conn,
2375                    &namespace,
2376                    &item_key,
2377                    provider_binary
2378                        .as_deref()
2379                        .expect("provider binary required"),
2380                    provider_model,
2381                    provider_timeout,
2382                    &args.mode,
2383                ),
2384            };
2385
2386            match call_result {
2387                Ok(EnrichItemResult::Done {
2388                    memory_id,
2389                    entity_id,
2390                    entities,
2391                    rels,
2392                    chars_before,
2393                    chars_after,
2394                    cost,
2395                    is_oauth,
2396                }) => {
2397                    if is_oauth && !oauth_detected {
2398                        oauth_detected = true;
2399                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2400                    }
2401                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2402
2403                    // Persist depends on the operation
2404                    let persist_err: Option<String> = match args.operation {
2405                        EnrichOperation::MemoryBindings => {
2406                            // Bindings already persisted inside call_memory_bindings
2407                            None
2408                        }
2409                        EnrichOperation::EntityDescriptions => {
2410                            // Description already persisted inside call_entity_description
2411                            None
2412                        }
2413                        EnrichOperation::BodyEnrich => {
2414                            // Body already persisted inside call_body_enrich
2415                            None
2416                        }
2417                        _ => {
2418                            // All G27 operations persist inside their call_* function
2419                            None
2420                        }
2421                    };
2422
2423                    if let Err(e) = queue_conn.execute(
2424                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2425                    rusqlite::params![
2426                        memory_id,
2427                        entity_id,
2428                        entities as i64,
2429                        rels as i64,
2430                        cost,
2431                        item_started.elapsed().as_millis() as i64,
2432                        queue_id
2433                    ],
2434                ) {
2435                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2436                    }
2437
2438                    if persist_err.is_none() {
2439                        completed += 1;
2440                        if !is_oauth {
2441                            cost_total += cost;
2442                        }
2443                        emit_json(&ItemEvent {
2444                            item: &item_key,
2445                            status: "done",
2446                            memory_id,
2447                            entity_id,
2448                            entities: Some(entities),
2449                            rels: Some(rels),
2450                            chars_before,
2451                            chars_after,
2452                            cost_usd: if is_oauth { None } else { Some(cost) },
2453                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2454                            error: None,
2455                            index: current_index,
2456                            total,
2457                        });
2458                    } else {
2459                        failed += 1;
2460                        emit_json(&ItemEvent {
2461                            item: &item_key,
2462                            status: "failed",
2463                            memory_id: None,
2464                            entity_id: None,
2465                            entities: None,
2466                            rels: None,
2467                            chars_before: None,
2468                            chars_after: None,
2469                            cost_usd: None,
2470                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2471                            error: persist_err,
2472                            index: current_index,
2473                            total,
2474                        });
2475                    }
2476                }
2477                Ok(EnrichItemResult::Skipped { reason }) => {
2478                    skipped += 1;
2479                    if let Err(e) = queue_conn.execute(
2480                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2481                    rusqlite::params![reason, queue_id],
2482                ) {
2483                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2484                    }
2485                    emit_json(&ItemEvent {
2486                        item: &item_key,
2487                        status: "skipped",
2488                        memory_id: None,
2489                        entity_id: None,
2490                        entities: None,
2491                        rels: None,
2492                        chars_before: None,
2493                        chars_after: None,
2494                        cost_usd: None,
2495                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2496                        error: None,
2497                        index: current_index,
2498                        total,
2499                    });
2500                }
2501                Ok(EnrichItemResult::PreservationFailed {
2502                    score,
2503                    threshold,
2504                    chars_before,
2505                    chars_after,
2506                }) => {
2507                    // G29 Passo 4: the LLM rewrite diverged too far from
2508                    // the original body. Count as a soft failure (not
2509                    // `failed`) so the queue surfaces it as a quality
2510                    // issue, not a transport error. The reason is
2511                    // structured so the operator can audit why a body
2512                    // was rejected.
2513                    skipped += 1;
2514                    let reason = format!(
2515                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2516                    );
2517                    if let Err(qe) = queue_conn.execute(
2518                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2519                        rusqlite::params![reason, queue_id],
2520                    ) {
2521                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2522                    }
2523                    emit_json(&ItemEvent {
2524                        item: &item_key,
2525                        status: "preservation_failed",
2526                        memory_id: None,
2527                        entity_id: None,
2528                        entities: None,
2529                        rels: None,
2530                        chars_before: Some(chars_before),
2531                        chars_after: Some(chars_after),
2532                        cost_usd: None,
2533                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2534                        error: Some(reason),
2535                        index: current_index,
2536                        total,
2537                    });
2538                }
2539                Err(e) => {
2540                    let err_str = format!("{e}");
2541                    if matches!(e, AppError::RateLimited { .. }) {
2542                        if crate::retry::is_kill_switch_active() {
2543                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2544                        } else if std::time::Instant::now() >= rate_limit_deadline {
2545                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2546                        } else {
2547                            let half = backoff_secs / 2;
2548                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2549                            let actual_wait = half + jitter;
2550                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2551                            if let Err(qe) = queue_conn.execute(
2552                                "UPDATE queue SET status='pending' WHERE id=?1",
2553                                rusqlite::params![queue_id],
2554                            ) {
2555                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2556                            }
2557                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2558                            backoff_secs = (backoff_secs * 2).min(900);
2559                            continue;
2560                        }
2561                    }
2562
2563                    failed += 1;
2564                    if let Err(qe) = queue_conn.execute(
2565                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2566                    rusqlite::params![err_str, queue_id],
2567                ) {
2568                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2569                    }
2570                    emit_json(&ItemEvent {
2571                        item: &item_key,
2572                        status: "failed",
2573                        memory_id: None,
2574                        entity_id: None,
2575                        entities: None,
2576                        rels: None,
2577                        chars_before: None,
2578                        chars_after: None,
2579                        cost_usd: None,
2580                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2581                        error: Some(err_str),
2582                        index: current_index,
2583                        total,
2584                    });
2585                }
2586            }
2587
2588            let _ = item_type; // used via queue schema only
2589        }
2590    } // end else (serial path)
2591
2592    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2593    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2594
2595    emit_json(&EnrichSummary {
2596        summary: true,
2597        operation: format!("{:?}", args.operation),
2598        items_total: total,
2599        completed,
2600        failed,
2601        skipped,
2602        cost_usd: cost_total,
2603        elapsed_ms: started.elapsed().as_millis() as u64,
2604        backend_invoked: take_enrich_backend(),
2605    });
2606
2607    if failed == 0 {
2608        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2609    }
2610
2611    Ok(())
2612}
2613
2614// ---------------------------------------------------------------------------
2615// Internal result type for a single item call
2616// ---------------------------------------------------------------------------
2617
2618enum EnrichItemResult {
2619    Done {
2620        memory_id: Option<i64>,
2621        entity_id: Option<i64>,
2622        entities: usize,
2623        rels: usize,
2624        chars_before: Option<usize>,
2625        chars_after: Option<usize>,
2626        cost: f64,
2627        is_oauth: bool,
2628    },
2629    Skipped {
2630        reason: String,
2631    },
2632    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2633    /// body beyond the configured `--preserve-threshold` and was rejected
2634    /// before persistence. The trigram-Jaccard score and threshold are
2635    /// emitted in the NDJSON stream for operator audit.
2636    PreservationFailed {
2637        score: f64,
2638        threshold: f64,
2639        chars_before: usize,
2640        chars_after: usize,
2641    },
2642}
2643
2644// ---------------------------------------------------------------------------
2645// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2646// ---------------------------------------------------------------------------
2647
2648fn call_memory_bindings(
2649    conn: &Connection,
2650    namespace: &str,
2651    memory_name: &str,
2652    binary: &Path,
2653    model: Option<&str>,
2654    timeout: u64,
2655    mode: &EnrichMode,
2656) -> Result<EnrichItemResult, AppError> {
2657    // Look up the memory
2658    let (memory_id, body): (i64, String) = conn.query_row(
2659        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2660        rusqlite::params![namespace, memory_name],
2661        |r| Ok((r.get(0)?, r.get(1)?)),
2662    ).map_err(|e| match e {
2663        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2664        other => AppError::Database(other),
2665    })?;
2666
2667    if body.trim().is_empty() {
2668        return Ok(EnrichItemResult::Skipped {
2669            reason: "body is empty".to_string(),
2670        });
2671    }
2672
2673    let (value, cost, is_oauth) = match mode {
2674        EnrichMode::ClaudeCode => call_claude(
2675            binary,
2676            BINDINGS_PROMPT,
2677            BINDINGS_SCHEMA,
2678            &body,
2679            model,
2680            timeout,
2681        )?,
2682        EnrichMode::Codex => call_codex(
2683            binary,
2684            BINDINGS_PROMPT,
2685            BINDINGS_SCHEMA,
2686            &body,
2687            model,
2688            timeout,
2689        )?,
2690        EnrichMode::Opencode => call_opencode(
2691            binary,
2692            BINDINGS_PROMPT,
2693            BINDINGS_SCHEMA,
2694            &body,
2695            model,
2696            timeout,
2697        )?,
2698    };
2699
2700    let empty_arr = serde_json::Value::Array(vec![]);
2701    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2702    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2703
2704    let (ent_count, rel_count) =
2705        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2706
2707    Ok(EnrichItemResult::Done {
2708        memory_id: Some(memory_id),
2709        entity_id: None,
2710        entities: ent_count,
2711        rels: rel_count,
2712        chars_before: None,
2713        chars_after: None,
2714        cost,
2715        is_oauth,
2716    })
2717}
2718
2719fn call_entity_description(
2720    conn: &Connection,
2721    namespace: &str,
2722    entity_name: &str,
2723    binary: &Path,
2724    model: Option<&str>,
2725    timeout: u64,
2726    mode: &EnrichMode,
2727) -> Result<EnrichItemResult, AppError> {
2728    let (entity_id, entity_type): (i64, String) = conn
2729        .query_row(
2730            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2731            rusqlite::params![namespace, entity_name],
2732            |r| Ok((r.get(0)?, r.get(1)?)),
2733        )
2734        .map_err(|e| match e {
2735            rusqlite::Error::QueryReturnedNoRows => {
2736                AppError::NotFound(format!("entity '{entity_name}' not found"))
2737            }
2738            other => AppError::Database(other),
2739        })?;
2740
2741    let prompt = format!(
2742        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2743    );
2744
2745    let (value, cost, is_oauth) = match mode {
2746        EnrichMode::ClaudeCode => call_claude(
2747            binary,
2748            &prompt,
2749            ENTITY_DESCRIPTION_SCHEMA,
2750            "",
2751            model,
2752            timeout,
2753        )?,
2754        EnrichMode::Codex => call_codex(
2755            binary,
2756            &prompt,
2757            ENTITY_DESCRIPTION_SCHEMA,
2758            "",
2759            model,
2760            timeout,
2761        )?,
2762        EnrichMode::Opencode => call_opencode(
2763            binary,
2764            &prompt,
2765            ENTITY_DESCRIPTION_SCHEMA,
2766            "",
2767            model,
2768            timeout,
2769        )?,
2770    };
2771
2772    let description = value
2773        .get("description")
2774        .and_then(|v| v.as_str())
2775        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2776
2777    persist_entity_description(conn, entity_id, description)?;
2778
2779    Ok(EnrichItemResult::Done {
2780        memory_id: None,
2781        entity_id: Some(entity_id),
2782        entities: 0,
2783        rels: 0,
2784        chars_before: None,
2785        chars_after: None,
2786        cost,
2787        is_oauth,
2788    })
2789}
2790
2791#[allow(clippy::too_many_arguments)]
2792fn call_body_enrich(
2793    conn: &Connection,
2794    namespace: &str,
2795    memory_name: &str,
2796    binary: &Path,
2797    model: Option<&str>,
2798    timeout: u64,
2799    mode: &EnrichMode,
2800    min_output_chars: usize,
2801    max_output_chars: usize,
2802    prompt_template: Option<&Path>,
2803    preserve_threshold: f64,
2804    paths: &crate::paths::AppPaths,
2805    llm_backend: crate::cli::LlmBackendChoice,
2806) -> Result<EnrichItemResult, AppError> {
2807    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2808        .query_row(
2809            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2810         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2811            rusqlite::params![namespace, memory_name],
2812            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2813        )
2814        .map_err(|e| match e {
2815            rusqlite::Error::QueryReturnedNoRows => {
2816                AppError::NotFound(format!("memory '{memory_name}' not found"))
2817            }
2818            other => AppError::Database(other),
2819        })?;
2820
2821    let chars_before = body.chars().count();
2822
2823    // G26: gather graph context for contextualized enrichment
2824    let linked_entities: Vec<String> = {
2825        let mut stmt = conn.prepare_cached(
2826            "SELECT e.name FROM memory_entities me \
2827             JOIN entities e ON e.id = me.entity_id \
2828             WHERE me.memory_id = ?1 LIMIT 10",
2829        )?;
2830        let result: Vec<String> = stmt
2831            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2832            .filter_map(|r| r.ok())
2833            .collect();
2834        drop(stmt);
2835        result
2836    };
2837
2838    // Load custom prompt template if provided
2839    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2840        let file_size = std::fs::metadata(tmpl_path)
2841            .map_err(|e| {
2842                AppError::Io(std::io::Error::new(
2843                    e.kind(),
2844                    format!("failed to stat prompt template: {e}"),
2845                ))
2846            })?
2847            .len();
2848        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2849            return Err(AppError::LimitExceeded(
2850                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2851            ));
2852        }
2853        std::fs::read_to_string(tmpl_path).map_err(|e| {
2854            AppError::Io(std::io::Error::new(
2855                e.kind(),
2856                format!("failed to read prompt template: {e}"),
2857            ))
2858        })?
2859    } else {
2860        BODY_ENRICH_PROMPT_PREFIX.to_string()
2861    };
2862
2863    // G26: build contextualized prompt with graph data
2864    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2865        let mut ctx = String::new();
2866        ctx.push_str(&format!(
2867            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2868        ));
2869        if !description.is_empty() {
2870            ctx.push_str(&format!("- Description: {description}\n"));
2871        }
2872        ctx.push_str(&format!("- Domain: {namespace}\n"));
2873        if !linked_entities.is_empty() {
2874            ctx.push_str(&format!(
2875                "- Linked entities: {}\n",
2876                linked_entities.join(", ")
2877            ));
2878        }
2879        ctx
2880    } else {
2881        String::new()
2882    };
2883
2884    let prompt = format!(
2885        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2886    );
2887
2888    // The body schema uses a free-form enriched_body field
2889    let (value, cost, is_oauth) = match mode {
2890        EnrichMode::ClaudeCode => {
2891            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2892        }
2893        EnrichMode::Codex => {
2894            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2895        }
2896        EnrichMode::Opencode => {
2897            call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2898        }
2899    };
2900
2901    let enriched_body = value
2902        .get("enriched_body")
2903        .and_then(|v| v.as_str())
2904        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2905
2906    let chars_after = enriched_body.chars().count();
2907
2908    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2909    // a trigram-Jaccard similarity between the original body and the
2910    // LLM-rewritten body. When the score falls below
2911    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2912    // rewrite as a likely hallucination. The result is recorded in the
2913    // NDJSON stream so operators can audit what the LLM tried to do.
2914    let threshold = preserve_threshold;
2915    let verdict =
2916        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2917    if !verdict.is_accepted() {
2918        return Ok(EnrichItemResult::PreservationFailed {
2919            score: match verdict {
2920                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2921                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2922                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2923            },
2924            threshold,
2925            chars_before,
2926            chars_after,
2927        });
2928    }
2929
2930    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2931    // compare the hash of the original body against the hash of the enriched
2932    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2933    // body (rare but possible) — treat as `Skipped` so re-running the batch
2934    // is safe and the queue does not get re-persisted entries.
2935    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2936    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2937    if old_hash == new_hash {
2938        return Ok(EnrichItemResult::Skipped {
2939            reason: format!(
2940                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2941            ),
2942        });
2943    }
2944
2945    // Only persist if the enriched body is genuinely longer
2946    if chars_after <= chars_before {
2947        return Ok(EnrichItemResult::Skipped {
2948            reason: format!(
2949                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2950            ),
2951        });
2952    }
2953
2954    persist_enriched_body(
2955        conn,
2956        namespace,
2957        memory_id,
2958        memory_name,
2959        enriched_body,
2960        paths,
2961        llm_backend,
2962    )?;
2963
2964    Ok(EnrichItemResult::Done {
2965        memory_id: Some(memory_id),
2966        entity_id: None,
2967        entities: 0,
2968        rels: 0,
2969        chars_before: Some(chars_before),
2970        chars_after: Some(chars_after),
2971        cost,
2972        is_oauth,
2973    })
2974}
2975
2976fn call_reembed(
2977    conn: &Connection,
2978    namespace: &str,
2979    memory_name: &str,
2980    paths: &crate::paths::AppPaths,
2981    llm_backend: crate::cli::LlmBackendChoice,
2982) -> Result<EnrichItemResult, AppError> {
2983    let (memory_id, body, memory_type): (i64, String, String) = conn
2984        .query_row(
2985            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2986             FROM memories
2987             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2988            rusqlite::params![namespace, memory_name],
2989            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2990        )
2991        .map_err(|e| match e {
2992            rusqlite::Error::QueryReturnedNoRows => {
2993                AppError::NotFound(format!("memory '{memory_name}' not found"))
2994            }
2995            other => AppError::Database(other),
2996        })?;
2997
2998    if body.trim().is_empty() {
2999        return Ok(EnrichItemResult::Skipped {
3000            reason: "body is empty".to_string(),
3001        });
3002    }
3003
3004    reembed_memory_vector(
3005        conn,
3006        namespace,
3007        memory_id,
3008        memory_name,
3009        &memory_type,
3010        &body,
3011        paths,
3012        llm_backend,
3013    )?;
3014
3015    Ok(EnrichItemResult::Done {
3016        memory_id: Some(memory_id),
3017        entity_id: None,
3018        entities: 0,
3019        rels: 0,
3020        chars_before: Some(body.chars().count()),
3021        chars_after: Some(body.chars().count()),
3022        cost: 0.0,
3023        is_oauth: true,
3024    })
3025}
3026
3027// ---------------------------------------------------------------------------
3028// Scan dispatcher — maps operation to scan query result (item keys)
3029// ---------------------------------------------------------------------------
3030
3031fn scan_operation(
3032    conn: &Connection,
3033    namespace: &str,
3034    args: &EnrichArgs,
3035) -> Result<Vec<String>, AppError> {
3036    // G37: resolve --names + --names-file once and apply to every scan path.
3037    let name_filter = resolve_name_filter(args)?;
3038    match args.operation {
3039        EnrichOperation::MemoryBindings => {
3040            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3041            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3042        }
3043        EnrichOperation::EntityDescriptions => {
3044            let rows =
3045                scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3046            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3047        }
3048        EnrichOperation::BodyEnrich => {
3049            let rows = scan_short_body_memories(
3050                conn,
3051                namespace,
3052                args.min_output_chars,
3053                args.limit,
3054                &name_filter,
3055            )?;
3056            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3057        }
3058        EnrichOperation::ReEmbed => {
3059            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3060            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3061        }
3062        EnrichOperation::WeightCalibrate => {
3063            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3064            Ok(rows
3065                .into_iter()
3066                .map(|(id, _, _, _, _)| id.to_string())
3067                .collect())
3068        }
3069        EnrichOperation::RelationReclassify => {
3070            let rows = scan_generic_relations(conn, namespace, args.limit)?;
3071            Ok(rows
3072                .into_iter()
3073                .map(|(id, _, _, _)| id.to_string())
3074                .collect())
3075        }
3076        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3077            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3078            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3079        }
3080        EnrichOperation::EntityTypeValidate => {
3081            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3082            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3083        }
3084        EnrichOperation::DescriptionEnrich => {
3085            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3086            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3087        }
3088        EnrichOperation::DomainClassify
3089        | EnrichOperation::GraphAudit
3090        | EnrichOperation::DeepResearchSynth
3091        | EnrichOperation::BodyExtract => {
3092            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3093            let sql = format!(
3094                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3095            );
3096            let mut stmt = conn.prepare(&sql)?;
3097            let names = stmt
3098                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3099                .collect::<Result<Vec<_>, _>>()?;
3100            Ok(names)
3101        }
3102    }
3103}
3104
3105// ---------------------------------------------------------------------------
3106// Codex stub provider
3107// ---------------------------------------------------------------------------
3108
3109/// Locates the Codex CLI binary.
3110fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3111    if let Some(p) = explicit {
3112        if p.exists() {
3113            return Ok(p.to_path_buf());
3114        }
3115        return Err(AppError::Validation(format!(
3116            "Codex binary not found at explicit path: {}",
3117            p.display()
3118        )));
3119    }
3120
3121    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3122        let p = PathBuf::from(&env_path);
3123        if p.exists() {
3124            return Ok(p);
3125        }
3126    }
3127
3128    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3129    if let Some(path_var) = std::env::var_os("PATH") {
3130        for dir in std::env::split_paths(&path_var) {
3131            let candidate = dir.join(name);
3132            if candidate.exists() {
3133                return Ok(crate::extract::llm_embedding::resolve_real_binary(
3134                    &candidate,
3135                ));
3136            }
3137        }
3138    }
3139
3140    Err(AppError::Validation(
3141        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3142    ))
3143}
3144
3145/// G27: Calibrate weight of a single relationship via LLM.
3146fn call_weight_calibrate(
3147    conn: &Connection,
3148    _namespace: &str,
3149    item_key: &str,
3150    binary: &Path,
3151    model: Option<&str>,
3152    timeout: u64,
3153    mode: &EnrichMode,
3154) -> Result<EnrichItemResult, AppError> {
3155    let rel_id: i64 = item_key
3156        .parse()
3157        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3158    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3159        .query_row(
3160            "SELECT e1.name, e2.name, r.relation, r.weight \
3161             FROM relationships r \
3162             JOIN entities e1 ON e1.id = r.source_id \
3163             JOIN entities e2 ON e2.id = r.target_id \
3164             WHERE r.id = ?1",
3165            rusqlite::params![rel_id],
3166            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3167        )
3168        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3169
3170    let input_text = format!(
3171        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3172    );
3173    let (value, cost, is_oauth) = match mode {
3174        EnrichMode::ClaudeCode => call_claude(
3175            binary,
3176            WEIGHT_CALIBRATE_PROMPT,
3177            WEIGHT_CALIBRATE_SCHEMA,
3178            &input_text,
3179            model,
3180            timeout,
3181        )?,
3182        EnrichMode::Codex => call_codex(
3183            binary,
3184            WEIGHT_CALIBRATE_PROMPT,
3185            WEIGHT_CALIBRATE_SCHEMA,
3186            &input_text,
3187            model,
3188            timeout,
3189        )?,
3190        EnrichMode::Opencode => call_opencode(
3191            binary,
3192            WEIGHT_CALIBRATE_PROMPT,
3193            WEIGHT_CALIBRATE_SCHEMA,
3194            &input_text,
3195            model,
3196            timeout,
3197        )?,
3198    };
3199
3200    let calibrated = value
3201        .get("calibrated_weight")
3202        .and_then(|v| v.as_f64())
3203        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3204
3205    conn.execute(
3206        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3207        rusqlite::params![calibrated, rel_id],
3208    )?;
3209
3210    Ok(EnrichItemResult::Done {
3211        memory_id: None,
3212        entity_id: None,
3213        entities: 0,
3214        rels: 1,
3215        chars_before: None,
3216        chars_after: None,
3217        cost,
3218        is_oauth,
3219    })
3220}
3221
3222/// G27: Reclassify a generic relationship type via LLM.
3223fn call_relation_reclassify(
3224    conn: &Connection,
3225    _namespace: &str,
3226    item_key: &str,
3227    binary: &Path,
3228    model: Option<&str>,
3229    timeout: u64,
3230    mode: &EnrichMode,
3231) -> Result<EnrichItemResult, AppError> {
3232    let rel_id: i64 = item_key
3233        .parse()
3234        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3235    let (source_name, target_name, current_relation): (String, String, String) = conn
3236        .query_row(
3237            "SELECT e1.name, e2.name, r.relation \
3238             FROM relationships r \
3239             JOIN entities e1 ON e1.id = r.source_id \
3240             JOIN entities e2 ON e2.id = r.target_id \
3241             WHERE r.id = ?1",
3242            rusqlite::params![rel_id],
3243            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3244        )
3245        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3246
3247    let input_text = format!(
3248        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3249    );
3250    let (value, cost, is_oauth) = match mode {
3251        EnrichMode::ClaudeCode => call_claude(
3252            binary,
3253            RELATION_RECLASSIFY_PROMPT,
3254            RELATION_RECLASSIFY_SCHEMA,
3255            &input_text,
3256            model,
3257            timeout,
3258        )?,
3259        EnrichMode::Codex => call_codex(
3260            binary,
3261            RELATION_RECLASSIFY_PROMPT,
3262            RELATION_RECLASSIFY_SCHEMA,
3263            &input_text,
3264            model,
3265            timeout,
3266        )?,
3267        EnrichMode::Opencode => call_opencode(
3268            binary,
3269            RELATION_RECLASSIFY_PROMPT,
3270            RELATION_RECLASSIFY_SCHEMA,
3271            &input_text,
3272            model,
3273            timeout,
3274        )?,
3275    };
3276
3277    let new_relation = value
3278        .get("relation")
3279        .and_then(|v| v.as_str())
3280        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3281    let new_strength = value
3282        .get("strength")
3283        .and_then(|v| v.as_f64())
3284        .unwrap_or(0.5);
3285
3286    conn.execute(
3287        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3288        rusqlite::params![new_relation, new_strength, rel_id],
3289    )?;
3290
3291    Ok(EnrichItemResult::Done {
3292        memory_id: None,
3293        entity_id: None,
3294        entities: 0,
3295        rels: 1,
3296        chars_before: None,
3297        chars_after: None,
3298        cost,
3299        is_oauth,
3300    })
3301}
3302
3303/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3304fn call_entity_connect(
3305    conn: &Connection,
3306    namespace: &str,
3307    item_key: &str,
3308    binary: &Path,
3309    model: Option<&str>,
3310    timeout: u64,
3311    mode: &EnrichMode,
3312) -> Result<EnrichItemResult, AppError> {
3313    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3314    let (e1_id, e1_name, e2_id, e2_name) =
3315        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3316            Some(p) => p,
3317            None => {
3318                return Ok(EnrichItemResult::Skipped {
3319                    reason: "pair no longer isolated".into(),
3320                })
3321            }
3322        };
3323    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3324    let (value, cost, is_oauth) = match mode {
3325        EnrichMode::ClaudeCode => call_claude(
3326            binary,
3327            ENTITY_CONNECT_PROMPT,
3328            ENTITY_CONNECT_SCHEMA,
3329            &input_text,
3330            model,
3331            timeout,
3332        )?,
3333        EnrichMode::Codex => call_codex(
3334            binary,
3335            ENTITY_CONNECT_PROMPT,
3336            ENTITY_CONNECT_SCHEMA,
3337            &input_text,
3338            model,
3339            timeout,
3340        )?,
3341        EnrichMode::Opencode => call_opencode(
3342            binary,
3343            ENTITY_CONNECT_PROMPT,
3344            ENTITY_CONNECT_SCHEMA,
3345            &input_text,
3346            model,
3347            timeout,
3348        )?,
3349    };
3350    let relation = value
3351        .get("relation")
3352        .and_then(|v| v.as_str())
3353        .unwrap_or("none");
3354    if relation == "none" {
3355        return Ok(EnrichItemResult::Skipped {
3356            reason: "LLM determined no relationship".into(),
3357        });
3358    }
3359    let strength = value
3360        .get("strength")
3361        .and_then(|v| v.as_f64())
3362        .unwrap_or(0.5);
3363    conn.execute(
3364        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3365        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3366    )?;
3367    Ok(EnrichItemResult::Done {
3368        memory_id: None,
3369        entity_id: None,
3370        entities: 0,
3371        rels: 1,
3372        chars_before: None,
3373        chars_after: None,
3374        cost,
3375        is_oauth,
3376    })
3377}
3378
3379/// G27 P2: Validate entity type assignment via LLM.
3380fn call_entity_type_validate(
3381    conn: &Connection,
3382    _namespace: &str,
3383    item_key: &str,
3384    binary: &Path,
3385    model: Option<&str>,
3386    timeout: u64,
3387    mode: &EnrichMode,
3388) -> Result<EnrichItemResult, AppError> {
3389    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3390        .query_row(
3391            "SELECT id, name, type FROM entities WHERE name = ?1",
3392            rusqlite::params![item_key],
3393            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3394        )
3395        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3396    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3397    let (value, cost, is_oauth) = match mode {
3398        EnrichMode::ClaudeCode => call_claude(
3399            binary,
3400            ENTITY_TYPE_VALIDATE_PROMPT,
3401            ENTITY_TYPE_VALIDATE_SCHEMA,
3402            &input_text,
3403            model,
3404            timeout,
3405        )?,
3406        EnrichMode::Codex => call_codex(
3407            binary,
3408            ENTITY_TYPE_VALIDATE_PROMPT,
3409            ENTITY_TYPE_VALIDATE_SCHEMA,
3410            &input_text,
3411            model,
3412            timeout,
3413        )?,
3414        EnrichMode::Opencode => call_opencode(
3415            binary,
3416            ENTITY_TYPE_VALIDATE_PROMPT,
3417            ENTITY_TYPE_VALIDATE_SCHEMA,
3418            &input_text,
3419            model,
3420            timeout,
3421        )?,
3422    };
3423    let validated_type = value
3424        .get("validated_type")
3425        .and_then(|v| v.as_str())
3426        .unwrap_or(&ent_type);
3427    let was_correct = value
3428        .get("was_correct")
3429        .and_then(|v| v.as_bool())
3430        .unwrap_or(true);
3431    if !was_correct {
3432        conn.execute(
3433            "UPDATE entities SET type = ?1 WHERE id = ?2",
3434            rusqlite::params![validated_type, ent_id],
3435        )?;
3436    }
3437    Ok(EnrichItemResult::Done {
3438        memory_id: None,
3439        entity_id: Some(ent_id),
3440        entities: 1,
3441        rels: 0,
3442        chars_before: None,
3443        chars_after: None,
3444        cost,
3445        is_oauth,
3446    })
3447}
3448
3449/// G27 P2: Enrich generic memory description via LLM.
3450fn call_description_enrich(
3451    conn: &Connection,
3452    _namespace: &str,
3453    item_key: &str,
3454    binary: &Path,
3455    model: Option<&str>,
3456    timeout: u64,
3457    mode: &EnrichMode,
3458) -> Result<EnrichItemResult, AppError> {
3459    let (mem_id, body, old_desc): (i64, String, String) = conn
3460        .query_row(
3461            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3462            rusqlite::params![item_key],
3463            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3464        )
3465        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3466    let snippet: String = body.chars().take(500).collect();
3467    let input_text = format!(
3468        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3469    );
3470    let (value, cost, is_oauth) = match mode {
3471        EnrichMode::ClaudeCode => call_claude(
3472            binary,
3473            DESCRIPTION_ENRICH_PROMPT,
3474            DESCRIPTION_ENRICH_SCHEMA,
3475            &input_text,
3476            model,
3477            timeout,
3478        )?,
3479        EnrichMode::Codex => call_codex(
3480            binary,
3481            DESCRIPTION_ENRICH_PROMPT,
3482            DESCRIPTION_ENRICH_SCHEMA,
3483            &input_text,
3484            model,
3485            timeout,
3486        )?,
3487        EnrichMode::Opencode => call_opencode(
3488            binary,
3489            DESCRIPTION_ENRICH_PROMPT,
3490            DESCRIPTION_ENRICH_SCHEMA,
3491            &input_text,
3492            model,
3493            timeout,
3494        )?,
3495    };
3496    let new_desc = value
3497        .get("description")
3498        .and_then(|v| v.as_str())
3499        .unwrap_or(&old_desc);
3500    let old_name: String = conn.query_row(
3501        "SELECT name FROM memories WHERE id = ?1",
3502        rusqlite::params![mem_id],
3503        |r| r.get(0),
3504    )?;
3505    conn.execute(
3506        "UPDATE memories SET description = ?1 WHERE id = ?2",
3507        rusqlite::params![new_desc, mem_id],
3508    )?;
3509    memories::sync_fts_after_update(
3510        conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3511    )?;
3512    Ok(EnrichItemResult::Done {
3513        memory_id: Some(mem_id),
3514        entity_id: None,
3515        entities: 0,
3516        rels: 0,
3517        chars_before: Some(old_desc.len()),
3518        chars_after: Some(new_desc.len()),
3519        cost,
3520        is_oauth,
3521    })
3522}
3523
3524/// G27 P2: Classify memory into domain category via LLM.
3525fn call_domain_classify(
3526    conn: &Connection,
3527    _namespace: &str,
3528    item_key: &str,
3529    binary: &Path,
3530    model: Option<&str>,
3531    timeout: u64,
3532    mode: &EnrichMode,
3533) -> Result<EnrichItemResult, AppError> {
3534    let (mem_id, body, desc): (i64, String, String) = conn
3535        .query_row(
3536            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3537            rusqlite::params![item_key],
3538            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3539        )
3540        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3541    let snippet: String = body.chars().take(500).collect();
3542    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3543    let (value, cost, is_oauth) = match mode {
3544        EnrichMode::ClaudeCode => call_claude(
3545            binary,
3546            DOMAIN_CLASSIFY_PROMPT,
3547            DOMAIN_CLASSIFY_SCHEMA,
3548            &input_text,
3549            model,
3550            timeout,
3551        )?,
3552        EnrichMode::Codex => call_codex(
3553            binary,
3554            DOMAIN_CLASSIFY_PROMPT,
3555            DOMAIN_CLASSIFY_SCHEMA,
3556            &input_text,
3557            model,
3558            timeout,
3559        )?,
3560        EnrichMode::Opencode => call_opencode(
3561            binary,
3562            DOMAIN_CLASSIFY_PROMPT,
3563            DOMAIN_CLASSIFY_SCHEMA,
3564            &input_text,
3565            model,
3566            timeout,
3567        )?,
3568    };
3569    let domain = value
3570        .get("domain")
3571        .and_then(|v| v.as_str())
3572        .unwrap_or("uncategorized");
3573    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3574    conn.execute(
3575        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3576        rusqlite::params![metadata, mem_id],
3577    )?;
3578    Ok(EnrichItemResult::Done {
3579        memory_id: Some(mem_id),
3580        entity_id: None,
3581        entities: 0,
3582        rels: 0,
3583        chars_before: None,
3584        chars_after: None,
3585        cost,
3586        is_oauth,
3587    })
3588}
3589
3590/// G27 P2: Audit memory graph quality via LLM.
3591fn call_graph_audit(
3592    conn: &Connection,
3593    _namespace: &str,
3594    item_key: &str,
3595    binary: &Path,
3596    model: Option<&str>,
3597    timeout: u64,
3598    mode: &EnrichMode,
3599) -> Result<EnrichItemResult, AppError> {
3600    let (mem_id, body, desc): (i64, String, String) = conn
3601        .query_row(
3602            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3603            rusqlite::params![item_key],
3604            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3605        )
3606        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3607    let snippet: String = body.chars().take(500).collect();
3608    let ent_count: i64 = conn
3609        .query_row(
3610            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3611            rusqlite::params![mem_id],
3612            |r| r.get(0),
3613        )
3614        .unwrap_or(0);
3615    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3616    let (value, cost, is_oauth) = match mode {
3617        EnrichMode::ClaudeCode => call_claude(
3618            binary,
3619            GRAPH_AUDIT_PROMPT,
3620            GRAPH_AUDIT_SCHEMA,
3621            &input_text,
3622            model,
3623            timeout,
3624        )?,
3625        EnrichMode::Codex => call_codex(
3626            binary,
3627            GRAPH_AUDIT_PROMPT,
3628            GRAPH_AUDIT_SCHEMA,
3629            &input_text,
3630            model,
3631            timeout,
3632        )?,
3633        EnrichMode::Opencode => call_opencode(
3634            binary,
3635            GRAPH_AUDIT_PROMPT,
3636            GRAPH_AUDIT_SCHEMA,
3637            &input_text,
3638            model,
3639            timeout,
3640        )?,
3641    };
3642    let issues = value
3643        .get("issues")
3644        .and_then(|v| v.as_array())
3645        .map(|a| a.len())
3646        .unwrap_or(0);
3647    Ok(EnrichItemResult::Done {
3648        memory_id: Some(mem_id),
3649        entity_id: None,
3650        entities: 0,
3651        rels: issues,
3652        chars_before: None,
3653        chars_after: None,
3654        cost,
3655        is_oauth,
3656    })
3657}
3658
3659/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3660fn call_deep_research_synth(
3661    conn: &Connection,
3662    namespace: &str,
3663    item_key: &str,
3664    binary: &Path,
3665    model: Option<&str>,
3666    timeout: u64,
3667    mode: &EnrichMode,
3668) -> Result<EnrichItemResult, AppError> {
3669    let (mem_id, body): (i64, String) = conn
3670        .query_row(
3671            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3672            rusqlite::params![item_key],
3673            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3674        )
3675        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3676    let snippet: String = body.chars().take(2000).collect();
3677    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3678    let (value, cost, is_oauth) = match mode {
3679        EnrichMode::ClaudeCode => call_claude(
3680            binary,
3681            DEEP_RESEARCH_SYNTH_PROMPT,
3682            DEEP_RESEARCH_SYNTH_SCHEMA,
3683            &input_text,
3684            model,
3685            timeout,
3686        )?,
3687        EnrichMode::Codex => call_codex(
3688            binary,
3689            DEEP_RESEARCH_SYNTH_PROMPT,
3690            DEEP_RESEARCH_SYNTH_SCHEMA,
3691            &input_text,
3692            model,
3693            timeout,
3694        )?,
3695        EnrichMode::Opencode => call_opencode(
3696            binary,
3697            DEEP_RESEARCH_SYNTH_PROMPT,
3698            DEEP_RESEARCH_SYNTH_SCHEMA,
3699            &input_text,
3700            model,
3701            timeout,
3702        )?,
3703    };
3704    let mut ent_count = 0usize;
3705    let mut rel_count = 0usize;
3706    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3707        for e in ents {
3708            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3709            let etype_str = e
3710                .get("entity_type")
3711                .and_then(|v| v.as_str())
3712                .unwrap_or("concept");
3713            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3714            if name.len() >= 2 {
3715                let ne = NewEntity {
3716                    name: name.to_string(),
3717                    entity_type: etype,
3718                    description: None,
3719                };
3720                let _ = entities::upsert_entity(conn, namespace, &ne);
3721                ent_count += 1;
3722            }
3723        }
3724    }
3725    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3726        for r in rels {
3727            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3728            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3729            if src.is_empty() || tgt.is_empty() {
3730                continue;
3731            }
3732            let rel = r
3733                .get("relation")
3734                .and_then(|v| v.as_str())
3735                .unwrap_or("related");
3736            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3737            if let (Some(sid), Some(tid)) = (
3738                entities::find_entity_id(conn, namespace, src)?,
3739                entities::find_entity_id(conn, namespace, tgt)?,
3740            ) {
3741                let _ = entities::create_or_fetch_relationship(
3742                    conn, namespace, sid, tid, rel, str_, None,
3743                );
3744                rel_count += 1;
3745            }
3746        }
3747    }
3748    Ok(EnrichItemResult::Done {
3749        memory_id: Some(mem_id),
3750        entity_id: None,
3751        entities: ent_count,
3752        rels: rel_count,
3753        chars_before: None,
3754        chars_after: None,
3755        cost,
3756        is_oauth,
3757    })
3758}
3759
3760/// G27 P2: Extract structured body from unstructured text via LLM.
3761fn call_body_extract(
3762    conn: &Connection,
3763    _namespace: &str,
3764    item_key: &str,
3765    binary: &Path,
3766    model: Option<&str>,
3767    timeout: u64,
3768    mode: &EnrichMode,
3769) -> Result<EnrichItemResult, AppError> {
3770    let (mem_id, body, old_desc): (i64, String, String) = conn
3771        .query_row(
3772            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3773            rusqlite::params![item_key],
3774            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3775        )
3776        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3777    let old_name: String = conn.query_row(
3778        "SELECT name FROM memories WHERE id = ?1",
3779        rusqlite::params![mem_id],
3780        |r| r.get(0),
3781    )?;
3782    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3783    let (value, cost, is_oauth) = match mode {
3784        EnrichMode::ClaudeCode => call_claude(
3785            binary,
3786            BODY_EXTRACT_PROMPT,
3787            BODY_EXTRACT_SCHEMA,
3788            &input_text,
3789            model,
3790            timeout,
3791        )?,
3792        EnrichMode::Codex => call_codex(
3793            binary,
3794            BODY_EXTRACT_PROMPT,
3795            BODY_EXTRACT_SCHEMA,
3796            &input_text,
3797            model,
3798            timeout,
3799        )?,
3800        EnrichMode::Opencode => call_opencode(
3801            binary,
3802            BODY_EXTRACT_PROMPT,
3803            BODY_EXTRACT_SCHEMA,
3804            &input_text,
3805            model,
3806            timeout,
3807        )?,
3808    };
3809    let restructured = value
3810        .get("restructured_body")
3811        .and_then(|v| v.as_str())
3812        .unwrap_or(&body);
3813    let chars_before = body.len();
3814    let chars_after = restructured.len();
3815    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3816    conn.execute(
3817        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3818        rusqlite::params![restructured, new_hash, mem_id],
3819    )?;
3820    memories::sync_fts_after_update(
3821        conn,
3822        mem_id,
3823        &old_name,
3824        &old_desc,
3825        &body,
3826        &old_name,
3827        &old_desc,
3828        restructured,
3829    )?;
3830    Ok(EnrichItemResult::Done {
3831        memory_id: Some(mem_id),
3832        entity_id: None,
3833        entities: 0,
3834        rels: 0,
3835        chars_before: Some(chars_before),
3836        chars_after: Some(chars_after),
3837        cost,
3838        is_oauth,
3839    })
3840}
3841
3842/// Scan for pairs of entities that share no direct relationship.
3843#[allow(clippy::type_complexity)]
3844fn scan_isolated_entity_pairs(
3845    conn: &Connection,
3846    namespace: &str,
3847    limit: Option<usize>,
3848) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3849    let limit_val = limit.unwrap_or(50) as i64;
3850    let mut stmt = conn.prepare_cached(
3851        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3852         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3853         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3854           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3855           (r.source_id = e2.id AND r.target_id = e1.id)) \
3856         LIMIT ?2",
3857    )?;
3858    let rows = stmt
3859        .query_map(rusqlite::params![namespace, limit_val], |r| {
3860            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3861        })?
3862        .collect::<Result<Vec<_>, _>>()?;
3863    Ok(rows)
3864}
3865
3866/// Scan for entities with non-validated types (all entities for type audit).
3867fn scan_entities_for_type_validation(
3868    conn: &Connection,
3869    namespace: &str,
3870    limit: Option<usize>,
3871) -> Result<Vec<(i64, String, String)>, AppError> {
3872    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3873    let sql = format!(
3874        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3875    );
3876    let mut stmt = conn.prepare(&sql)?;
3877    let rows = stmt
3878        .query_map(rusqlite::params![namespace], |r| {
3879            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3880        })?
3881        .collect::<Result<Vec<_>, _>>()?;
3882    Ok(rows)
3883}
3884
3885/// Scan for memories with generic descriptions (ingested, imported, etc).
3886fn scan_generic_descriptions(
3887    conn: &Connection,
3888    namespace: &str,
3889    limit: Option<usize>,
3890) -> Result<Vec<(i64, String, String)>, AppError> {
3891    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3892    let sql = format!(
3893        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3894         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3895         ORDER BY id {limit_clause}"
3896    );
3897    let mut stmt = conn.prepare(&sql)?;
3898    let rows = stmt
3899        .query_map(rusqlite::params![namespace], |r| {
3900            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3901        })?
3902        .collect::<Result<Vec<_>, _>>()?;
3903    Ok(rows)
3904}
3905
3906/// Calls the Codex CLI for a single enrichment item.
3907///
3908/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3909fn call_codex(
3910    binary: &Path,
3911    prompt: &str,
3912    json_schema: &str,
3913    input_text: &str,
3914    model: Option<&str>,
3915    timeout_secs: u64,
3916) -> Result<(serde_json::Value, f64, bool), AppError> {
3917    use wait_timeout::ChildExt;
3918
3919    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3920    // schema to a trusted cache path (not /tmp), and reuse the
3921    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3922    // hardening rationale.
3923    super::codex_spawn::validate_codex_model(model)?;
3924    let schema_file = super::codex_spawn::trusted_schema_path()?;
3925
3926    let args = super::codex_spawn::CodexSpawnArgs {
3927        binary,
3928        prompt,
3929        json_schema,
3930        input_text,
3931        model,
3932        timeout_secs,
3933        schema_path: schema_file.clone(),
3934    };
3935    let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3936
3937    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3938        AppError::Io(std::io::Error::new(
3939            e.kind(),
3940            format!("failed to spawn codex: {e}"),
3941        ))
3942    })?;
3943
3944    let full_prompt = format!("{prompt}\n\n{input_text}");
3945    let stdin_bytes = full_prompt.into_bytes();
3946    let mut child_stdin = child
3947        .stdin
3948        .take()
3949        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3950    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3951        child_stdin.write_all(&stdin_bytes)?;
3952        drop(child_stdin);
3953        Ok(())
3954    });
3955
3956    let start = std::time::Instant::now();
3957    let timeout = std::time::Duration::from_secs(timeout_secs);
3958    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3959    let _ = std::fs::remove_file(&schema_file);
3960
3961    match status {
3962        Some(exit_status) => {
3963            stdin_thread
3964                .join()
3965                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3966                .map_err(AppError::Io)?;
3967
3968            tracing::debug!(
3969                target: "process",
3970                exit_code = ?exit_status.code(),
3971                elapsed_ms = start.elapsed().as_millis() as u64,
3972                "external process completed"
3973            );
3974
3975            let mut stdout_buf = Vec::new();
3976            if let Some(mut out) = child.stdout.take() {
3977                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3978            }
3979            if !exit_status.success() {
3980                let mut stderr_buf = Vec::new();
3981                if let Some(mut err) = child.stderr.take() {
3982                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3983                }
3984                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3985                tracing::warn!(
3986                    target: "enrich",
3987                    exit_code = ?exit_status.code(),
3988                    stderr = %stderr_str.trim(),
3989                    "codex process failed"
3990                );
3991                return Err(AppError::Validation(format!(
3992                    "codex exited with code {:?}: {}",
3993                    exit_status.code(),
3994                    stderr_str.trim()
3995                )));
3996            }
3997            let stdout_str = String::from_utf8(stdout_buf)
3998                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3999            // G32: use the JSONL parser, NOT serde_json::from_str on the
4000            // entire stdout (codex emits one event per line).
4001            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4002            // Return the raw agent_message text parsed as JSON. Different
4003            // operations (memory-bindings, body-enrich) use different
4004            // output schemas, so we let the caller pick which fields to
4005            // extract. The previous implementation hardcoded
4006            // `{entities, urls}` which broke body-enrich.
4007            let value: serde_json::Value =
4008                serde_json::from_str(&result.last_agent_text).map_err(|e| {
4009                    AppError::Validation(format!(
4010                        "codex agent_message is not valid JSON: {e}; raw={}",
4011                        result.last_agent_text
4012                    ))
4013                })?;
4014            Ok((value, 0.0, false))
4015        }
4016        None => {
4017            let _ = child.kill();
4018            let _ = child.wait();
4019            let _ = stdin_thread.join();
4020            Err(AppError::Validation(format!(
4021                "codex timed out after {timeout_secs} seconds"
4022            )))
4023        }
4024    }
4025}
4026
4027fn call_opencode(
4028    binary: &Path,
4029    prompt: &str,
4030    json_schema: &str,
4031    input_text: &str,
4032    model: Option<&str>,
4033    timeout_secs: u64,
4034) -> Result<(serde_json::Value, f64, bool), AppError> {
4035    use wait_timeout::ChildExt;
4036
4037    let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4038
4039    let augmented_prompt = if json_schema.is_empty() {
4040        prompt.to_string()
4041    } else {
4042        format!(
4043            "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4044             The JSON MUST match this schema:\n{json_schema}"
4045        )
4046    };
4047
4048    let mut cmd = super::opencode_runner::build_opencode_command_sync(
4049        binary,
4050        &resolved_model,
4051        &augmented_prompt,
4052        input_text,
4053    );
4054
4055    let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4056        AppError::Io(std::io::Error::new(
4057            e.kind(),
4058            format!("failed to spawn opencode: {e}"),
4059        ))
4060    })?;
4061
4062    let start = std::time::Instant::now();
4063    let timeout = std::time::Duration::from_secs(timeout_secs);
4064    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4065
4066    match status {
4067        Some(exit_status) => {
4068            tracing::debug!(
4069                target: "process",
4070                exit_code = ?exit_status.code(),
4071                elapsed_ms = start.elapsed().as_millis() as u64,
4072                "opencode process completed"
4073            );
4074
4075            let mut stdout_buf = Vec::new();
4076            if let Some(mut out) = child.stdout.take() {
4077                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4078            }
4079            if !exit_status.success() {
4080                let mut stderr_buf = Vec::new();
4081                if let Some(mut err) = child.stderr.take() {
4082                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4083                }
4084                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4085                tracing::warn!(
4086                    target: "enrich",
4087                    exit_code = ?exit_status.code(),
4088                    stderr = %stderr_str.trim(),
4089                    "opencode process failed"
4090                );
4091                return Err(AppError::Validation(format!(
4092                    "opencode exited with code {:?}: {}",
4093                    exit_status.code(),
4094                    stderr_str.trim()
4095                )));
4096            }
4097            let stdout_str = String::from_utf8(stdout_buf)
4098                .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4099            let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4100            let value: serde_json::Value =
4101                super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4102                    AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4103                })?;
4104            Ok((value, cost, false))
4105        }
4106        None => {
4107            let _ = child.kill();
4108            let _ = child.wait();
4109            Err(AppError::Validation(format!(
4110                "opencode timed out after {timeout_secs} seconds"
4111            )))
4112        }
4113    }
4114}
4115
4116// ---------------------------------------------------------------------------
4117// Tests
4118// ---------------------------------------------------------------------------
4119
4120#[cfg(test)]
4121mod tests {
4122    use super::*;
4123    use rusqlite::Connection;
4124    #[cfg(unix)]
4125    use std::os::unix::fs::PermissionsExt;
4126
4127    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
4128    fn open_test_db() -> Connection {
4129        let conn = Connection::open_in_memory().expect("in-memory db");
4130        conn.execute_batch(
4131            "CREATE TABLE memories (
4132                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4133                namespace   TEXT NOT NULL DEFAULT 'global',
4134                name        TEXT NOT NULL,
4135                type        TEXT NOT NULL DEFAULT 'note',
4136                description TEXT NOT NULL DEFAULT '',
4137                body        TEXT NOT NULL DEFAULT '',
4138                body_hash   TEXT NOT NULL DEFAULT '',
4139                session_id  TEXT,
4140                source      TEXT NOT NULL DEFAULT 'agent',
4141                metadata    TEXT NOT NULL DEFAULT '{}',
4142                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4143                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4144                deleted_at  INTEGER,
4145                UNIQUE(namespace, name)
4146            );
4147            CREATE TABLE entities (
4148                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4149                namespace   TEXT NOT NULL DEFAULT 'global',
4150                name        TEXT NOT NULL,
4151                type        TEXT NOT NULL DEFAULT 'concept',
4152                description TEXT,
4153                degree      INTEGER NOT NULL DEFAULT 0,
4154                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4155                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4156                UNIQUE(namespace, name)
4157            );
4158            CREATE TABLE memory_entities (
4159                memory_id  INTEGER NOT NULL,
4160                entity_id  INTEGER NOT NULL,
4161                PRIMARY KEY (memory_id, entity_id)
4162            );
4163            CREATE TABLE relationships (
4164                id         INTEGER PRIMARY KEY AUTOINCREMENT,
4165                namespace  TEXT NOT NULL DEFAULT 'global',
4166                source_id  INTEGER NOT NULL,
4167                target_id  INTEGER NOT NULL,
4168                relation   TEXT NOT NULL,
4169                weight     REAL NOT NULL DEFAULT 0.5,
4170                description TEXT,
4171                UNIQUE(source_id, target_id, relation)
4172            );
4173            CREATE TABLE memory_embeddings (
4174                memory_id   INTEGER PRIMARY KEY,
4175                namespace   TEXT NOT NULL,
4176                embedding   BLOB NOT NULL,
4177                source      TEXT NOT NULL,
4178                model       TEXT NOT NULL DEFAULT '',
4179                dim         INTEGER NOT NULL DEFAULT 384,
4180                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
4181            );",
4182        )
4183        .expect("schema creation must succeed");
4184        conn
4185    }
4186
4187    #[test]
4188    fn scan_unbound_memories_finds_memories_without_bindings() {
4189        let conn = open_test_db();
4190        conn.execute(
4191            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4192            [],
4193        )
4194        .unwrap();
4195
4196        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4197        assert_eq!(results.len(), 1);
4198        assert_eq!(results[0].1, "test-mem");
4199    }
4200
4201    #[test]
4202    fn scan_unbound_memories_excludes_bound_memories() {
4203        let conn = open_test_db();
4204        conn.execute(
4205            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4206            [],
4207        )
4208        .unwrap();
4209        let mem_id: i64 = conn
4210            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4211                r.get(0)
4212            })
4213            .unwrap();
4214        conn.execute(
4215            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4216            [],
4217        )
4218        .unwrap();
4219        let ent_id: i64 = conn
4220            .query_row(
4221                "SELECT id FROM entities WHERE name='some-entity'",
4222                [],
4223                |r| r.get(0),
4224            )
4225            .unwrap();
4226        conn.execute(
4227            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4228            rusqlite::params![mem_id, ent_id],
4229        )
4230        .unwrap();
4231
4232        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4233        assert!(results.is_empty(), "bound memory must not appear in scan");
4234    }
4235
4236    #[test]
4237    fn scan_entities_without_description_finds_null_description() {
4238        let conn = open_test_db();
4239        conn.execute(
4240            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4241            [],
4242        )
4243        .unwrap();
4244
4245        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4246        assert_eq!(results.len(), 1);
4247        assert_eq!(results[0].1, "my-tool");
4248    }
4249
4250    #[test]
4251    fn scan_entities_without_description_excludes_entities_with_description() {
4252        let conn = open_test_db();
4253        conn.execute(
4254            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4255            [],
4256        )
4257        .unwrap();
4258
4259        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4260        assert!(
4261            results.is_empty(),
4262            "entity with description must not appear"
4263        );
4264    }
4265
4266    #[test]
4267    fn scan_short_body_memories_finds_short_bodies() {
4268        let conn = open_test_db();
4269        conn.execute(
4270            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4271            [],
4272        )
4273        .unwrap();
4274
4275        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4276        assert_eq!(results.len(), 1);
4277        assert_eq!(results[0].1, "short-mem");
4278    }
4279
4280    #[test]
4281    fn scan_short_body_memories_excludes_long_bodies() {
4282        let conn = open_test_db();
4283        let long_body = "a".repeat(1000);
4284        conn.execute(
4285            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4286            rusqlite::params![long_body],
4287        )
4288        .unwrap();
4289
4290        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4291        assert!(results.is_empty(), "long memory must not appear in scan");
4292    }
4293
4294    #[test]
4295    fn scan_respects_limit() {
4296        let conn = open_test_db();
4297        for i in 0..5 {
4298            conn.execute(
4299                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4300                [],
4301            )
4302            .unwrap();
4303        }
4304
4305        let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4306        assert_eq!(results.len(), 3, "limit must be respected");
4307    }
4308
4309    #[test]
4310    fn scan_memories_without_embeddings_finds_only_missing_rows() {
4311        let conn = open_test_db();
4312        conn.execute(
4313            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4314            [],
4315        )
4316        .unwrap();
4317        conn.execute(
4318            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4319            [],
4320        )
4321        .unwrap();
4322        let memory_id: i64 = conn
4323            .query_row(
4324                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4325                [],
4326                |r| r.get(0),
4327            )
4328            .unwrap();
4329        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4330        memories::upsert_vec(
4331            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4332        )
4333        .unwrap();
4334
4335        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4336        assert_eq!(results.len(), 1);
4337        assert_eq!(results[0].1, "missing-vec");
4338    }
4339
4340    #[test]
4341    fn scan_memories_without_embeddings_respects_name_filter() {
4342        let conn = open_test_db();
4343        conn.execute(
4344            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4345            [],
4346        )
4347        .unwrap();
4348        conn.execute(
4349            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4350            [],
4351        )
4352        .unwrap();
4353
4354        let results =
4355            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4356                .unwrap();
4357        assert_eq!(results.len(), 1);
4358        assert_eq!(results[0].1, "match-me");
4359    }
4360
4361    #[test]
4362    fn queue_db_schema_creates_correctly() {
4363        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4364        let conn = open_queue_db(&tmp_path).expect("queue db must open");
4365        let count: i64 = conn
4366            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4367            .unwrap();
4368        assert_eq!(count, 0);
4369        let _ = std::fs::remove_file(&tmp_path);
4370    }
4371
4372    #[test]
4373    fn parse_claude_output_valid_bindings() {
4374        let output = r#"[
4375            {"type":"system","subtype":"init"},
4376            {"type":"result","is_error":false,"total_cost_usd":0.01,
4377             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4378        ]"#;
4379        let result = crate::commands::claude_runner::parse_claude_output(output)
4380            .expect("must parse successfully");
4381        assert!(result.value.get("entities").is_some());
4382        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4383        assert!(!result.is_oauth);
4384    }
4385
4386    #[test]
4387    fn parse_claude_output_detects_oauth() {
4388        let output = r#"[
4389            {"type":"system","subtype":"init","apiKeySource":"none"},
4390            {"type":"result","is_error":false,"total_cost_usd":0.0,
4391             "structured_output":{"entities":[],"relationships":[]}}
4392        ]"#;
4393        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4394        assert!(result.is_oauth);
4395    }
4396
4397    #[test]
4398    fn parse_claude_output_rate_limit_returns_error() {
4399        let output = r#"[
4400            {"type":"system","subtype":"init"},
4401            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4402        ]"#;
4403        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4404        assert!(matches!(err, AppError::RateLimited { .. }));
4405    }
4406
4407    #[test]
4408    fn parse_claude_output_auth_error() {
4409        let output = r#"[
4410            {"type":"system","subtype":"init"},
4411            {"type":"result","is_error":true,"error":"authentication failed"}
4412        ]"#;
4413        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4414        assert!(format!("{err}").contains("authentication failed"));
4415    }
4416
4417    #[cfg(unix)]
4418    #[test]
4419    fn call_codex_returns_raw_json_for_body_enrich_schema() {
4420        let tmp = tempfile::tempdir().expect("tempdir");
4421        let binary = tmp.path().join("codex-mock");
4422        std::fs::write(
4423            &binary,
4424            r#"#!/usr/bin/env bash
4425set -euo pipefail
4426cat <<'JSONL'
4427{"type":"thread.started","thread_id":"mock-thread-0"}
4428{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4429{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4430JSONL
4431"#,
4432        )
4433        .expect("mock codex write");
4434        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4435        perms.set_mode(0o755);
4436        std::fs::set_permissions(&binary, perms).expect("chmod");
4437
4438        let (value, cost, is_oauth) =
4439            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4440                .expect("call_codex must accept body-enrich payload");
4441
4442        assert_eq!(value["enriched_body"], "expanded body");
4443        assert_eq!(cost, 0.0);
4444        assert!(!is_oauth);
4445    }
4446
4447    #[test]
4448    fn dry_run_emits_preview_without_calling_llm() {
4449        // This test validates the dry-run NDJSON contract without spawning any process.
4450        // The scan_operation function requires a DB; we build one in-memory but cannot
4451        // call run() directly because it needs AppPaths (disk). Instead we test the
4452        // lower-level helpers that the dry-run path relies on.
4453        let conn = open_test_db();
4454        conn.execute(
4455            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4456            [],
4457        )
4458        .unwrap();
4459
4460        let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4461        assert_eq!(results.len(), 1);
4462        assert_eq!(results[0].1, "dry-mem");
4463        // If scan finds the item and dry_run is set, no LLM would be called.
4464        // The NDJSON emission is tested via integration tests with a fake binary.
4465    }
4466
4467    #[test]
4468    fn persist_entity_description_updates_db() {
4469        let conn = open_test_db();
4470        conn.execute(
4471            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4472            [],
4473        )
4474        .unwrap();
4475        let eid: i64 = conn
4476            .query_row(
4477                "SELECT id FROM entities WHERE name='tokio-runtime'",
4478                [],
4479                |r| r.get(0),
4480            )
4481            .unwrap();
4482
4483        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4484
4485        let desc: String = conn
4486            .query_row(
4487                "SELECT description FROM entities WHERE id=?1",
4488                rusqlite::params![eid],
4489                |r| r.get(0),
4490            )
4491            .unwrap();
4492        assert_eq!(desc, "Async runtime for Rust applications");
4493    }
4494
4495    #[test]
4496    fn bindings_schema_is_valid_json() {
4497        let _: serde_json::Value =
4498            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4499    }
4500
4501    #[test]
4502    fn entity_description_schema_is_valid_json() {
4503        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4504            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4505    }
4506
4507    #[test]
4508    fn body_enrich_schema_is_valid_json() {
4509        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4510            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4511    }
4512}