Skip to main content

sqlite_graphrag/commands/
enrich.rs

1// TODO v1.0.89: este arquivo tem 4116 linhas — modularização planejada.
2// Ver ADR-0046 seção "Known Tech Debt (v1.0.89+)".
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY opportunity
19//!
20//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
21//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
22//! here verbatim. A future refactoring could extract them into a shared
23//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
24//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
25//! this stream's boundary — flagged here for the Integration stream to evaluate.
26
27use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42// ---------------------------------------------------------------------------
43// Constants
44// ---------------------------------------------------------------------------
45
46const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51// ---------------------------------------------------------------------------
52// JSON schema used for memory-bindings and body-enrich extraction
53// ---------------------------------------------------------------------------
54
55const BINDINGS_SCHEMA: &str = r#"{
56  "type": "object",
57  "properties": {
58    "entities": {
59      "type": "array",
60      "items": {
61        "type": "object",
62        "properties": {
63          "name": { "type": "string" },
64          "entity_type": {
65            "type": "string",
66            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67          }
68        },
69        "required": ["name", "entity_type"],
70        "additionalProperties": false
71      }
72    },
73    "relationships": {
74      "type": "array",
75      "items": {
76        "type": "object",
77        "properties": {
78          "source": { "type": "string" },
79          "target": { "type": "string" },
80          "relation": {
81            "type": "string",
82            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83          },
84          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85        },
86        "required": ["source","target","relation","strength"],
87        "additionalProperties": false
88      }
89    }
90  },
91  "required": ["entities","relationships"],
92  "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96  "type": "object",
97  "properties": {
98    "description": { "type": "string" }
99  },
100  "required": ["description"],
101  "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105  "type": "object",
106  "properties": {
107    "enriched_body": { "type": "string" }
108  },
109  "required": ["enriched_body"],
110  "additionalProperties": false
111}"#;
112
113// G27 P1: weight-calibrate
114const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123  "type": "object",
124  "properties": {
125    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126    "reasoning": { "type": "string" }
127  },
128  "required": ["calibrated_weight", "reasoning"],
129  "additionalProperties": false
130}"#;
131
132// G27 P1: relation-reclassify
133const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149  "type": "object",
150  "properties": {
151    "relation": { "type": "string" },
152    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153    "reasoning": { "type": "string" }
154  },
155  "required": ["relation", "strength", "reasoning"],
156  "additionalProperties": false
157}"#;
158
159// G27 P2: entity-connect — suggest relationships between isolated entities
160const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166  "type": "object",
167  "properties": {
168    "relation": { "type": "string" },
169    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170    "reasoning": { "type": "string" }
171  },
172  "required": ["relation", "strength", "reasoning"],
173  "additionalProperties": false
174}"#;
175
176// G27 P2: entity-type-validate — verify entity type assignments
177const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183  "type": "object",
184  "properties": {
185    "validated_type": { "type": "string" },
186    "was_correct": { "type": "boolean" },
187    "reasoning": { "type": "string" }
188  },
189  "required": ["validated_type", "was_correct", "reasoning"],
190  "additionalProperties": false
191}"#;
192
193// G27 P2: description-enrich — improve generic memory descriptions
194const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200  "type": "object",
201  "properties": {
202    "description": { "type": "string" },
203    "reasoning": { "type": "string" }
204  },
205  "required": ["description", "reasoning"],
206  "additionalProperties": false
207}"#;
208
209// G27 P2: domain-classify — classify memory into domain category
210const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214  "type": "object",
215  "properties": {
216    "domain": { "type": "string" },
217    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218    "reasoning": { "type": "string" }
219  },
220  "required": ["domain", "confidence", "reasoning"],
221  "additionalProperties": false
222}"#;
223
224// G27 P2: graph-audit — audit graph for quality issues
225const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230  "type": "object",
231  "properties": {
232    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234    "reasoning": { "type": "string" }
235  },
236  "required": ["quality_score", "issues", "reasoning"],
237  "additionalProperties": false
238}"#;
239
240// G27 P2: deep-research-synth — synthesize research findings into graph
241const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247  "type": "object",
248  "properties": {
249    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251    "summary": { "type": "string" }
252  },
253  "required": ["entities", "relationships", "summary"],
254  "additionalProperties": false
255}"#;
256
257// G27 P2: body-extract — extract structured content from unstructured text
258const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263  "type": "object",
264  "properties": {
265    "restructured_body": { "type": "string" },
266    "changes_summary": { "type": "string" }
267  },
268  "required": ["restructured_body", "changes_summary"],
269  "additionalProperties": false
270}"#;
271
272// ---------------------------------------------------------------------------
273// Prompts
274// ---------------------------------------------------------------------------
275
276const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291// ---------------------------------------------------------------------------
292// CLI args
293// ---------------------------------------------------------------------------
294
295/// Operation to perform in the `enrich` command.
296#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299    /// Add missing entity/relationship bindings to memories (fully implemented).
300    MemoryBindings,
301    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
302    EntityDescriptions,
303    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
304    BodyEnrich,
305    /// Rebuild missing memory embeddings without rewriting the memory body.
306    ReEmbed,
307    /// Calibrate relationship weights using LLM analysis (scan only).
308    WeightCalibrate,
309    /// Reclassify relationship types using LLM judgment (scan only).
310    RelationReclassify,
311    /// Connect isolated entities by suggesting new relationships (scan only).
312    EntityConnect,
313    /// Validate entity type assignments using LLM judgment (scan only).
314    EntityTypeValidate,
315    /// Enrich memory descriptions that are generic/auto-generated (scan only).
316    DescriptionEnrich,
317    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
318    CrossDomainBridges,
319    /// Classify memories into domain categories (scan only).
320    DomainClassify,
321    /// Audit the graph for quality issues (scan only).
322    GraphAudit,
323    /// Synthesize deep-research findings into graph memories (scan only).
324    DeepResearchSynth,
325    /// Extract structured body from unstructured text (scan only).
326    BodyExtract,
327}
328
329/// LLM provider for enrichment.
330#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332    /// Use locally installed Claude Code CLI (OAuth-first).
333    ClaudeCode,
334    /// Use locally installed OpenAI Codex CLI.
335    Codex,
336    /// Use locally installed OpenCode CLI.
337    #[value(name = "opencode")]
338    Opencode,
339}
340
341impl std::fmt::Display for EnrichMode {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        match self {
344            EnrichMode::ClaudeCode => write!(f, "claude-code"),
345            EnrichMode::Codex => write!(f, "codex"),
346            EnrichMode::Opencode => write!(f, "opencode"),
347        }
348    }
349}
350
351/// Arguments for the `enrich` subcommand.
352#[derive(clap::Args)]
353#[command(
354    about = "Enrich graph memories and entities using an LLM provider",
355    after_long_help = "EXAMPLES:\n  \
356    # Add missing entity bindings to all unbound memories\n  \
357    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
358    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
359    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
360    # Expand short memory bodies (GAP-18)\n  \
361    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
362    # Rebuild only missing memory embeddings without rewriting bodies\n  \
363    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
364    # Resume an interrupted body-enrich run\n  \
365    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
366    # Retry only failed items from a previous run\n  \
367    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
368    EXIT CODES:\n  \
369    0  success\n  \
370    1  validation error (bad args, binary not found)\n  \
371    14 I/O error"
372)]
373pub struct EnrichArgs {
374    /// Enrichment operation to run.
375    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
376    pub operation: EnrichOperation,
377
378    /// LLM provider to use. Default: claude-code (OAuth-first).
379    #[arg(long, value_enum, default_value = "claude-code")]
380    pub mode: EnrichMode,
381
382    /// Maximum number of items to process in this run. Omit for all.
383    #[arg(long, value_name = "N")]
384    pub limit: Option<usize>,
385
386    /// Preview items without calling the LLM (zero tokens consumed).
387    #[arg(long)]
388    pub dry_run: bool,
389
390    /// Namespace to operate on. Default: global.
391    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
392    pub namespace: Option<String>,
393
394    // -- Provider flags (Claude) --
395    /// Path to the Claude Code binary. Default: auto-detect from PATH.
396    #[arg(long, value_name = "PATH")]
397    pub claude_binary: Option<PathBuf>,
398
399    /// Claude model to use (e.g. claude-sonnet-4-6).
400    #[arg(long, value_name = "MODEL")]
401    pub claude_model: Option<String>,
402
403    /// Timeout per item in seconds when using Claude Code. Default: 300.
404    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
405    pub claude_timeout: u64,
406
407    // -- Provider flags (Codex) --
408    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
409    #[arg(long, value_name = "PATH")]
410    pub codex_binary: Option<PathBuf>,
411
412    /// Codex model to use (e.g. o4-mini).
413    #[arg(long, value_name = "MODEL")]
414    pub codex_model: Option<String>,
415
416    /// Timeout per item in seconds when using Codex. Default: 300.
417    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
418    pub codex_timeout: u64,
419
420    // -- Provider flags (OpenCode) --
421    /// Path to the OpenCode binary. Default: auto-detect from PATH.
422    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
423    pub opencode_binary: Option<PathBuf>,
424
425    /// OpenCode model to use.
426    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
427    pub opencode_model: Option<String>,
428
429    /// Timeout per item in seconds when using OpenCode. Default: 300.
430    #[arg(
431        long,
432        value_name = "SECONDS",
433        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
434        default_value_t = 300
435    )]
436    pub opencode_timeout: u64,
437
438    // -- Cost controls --
439    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
440    #[arg(long, value_name = "USD")]
441    pub max_cost_usd: Option<f64>,
442
443    // -- Queue controls --
444    /// Resume a previously interrupted run (skip already-done items).
445    #[arg(long)]
446    pub resume: bool,
447
448    /// Retry only items that failed in a previous run.
449    #[arg(long)]
450    pub retry_failed: bool,
451
452    // -- body-enrich specific flags (GAP-18) --
453    /// Minimum output character count for body-enrich. Default: 500.
454    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
455    pub min_output_chars: usize,
456
457    /// Maximum output character count for body-enrich. Default: 2000.
458    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
459    pub max_output_chars: usize,
460
461    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
462    #[arg(long, default_value_t = true)]
463    pub preserve_check: bool,
464
465    /// Path to a custom prompt template file for body-enrich.
466    #[arg(long, value_name = "PATH")]
467    pub prompt_template: Option<PathBuf>,
468
469    /// Number of parallel LLM workers (default 1 = serial).
470    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
471    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
472    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
473    pub llm_parallelism: u32,
474
475    // -- Output / infra --
476    /// Emit NDJSON output. Always true; flag accepted for compatibility.
477    #[arg(long)]
478    pub json: bool,
479
480    /// Database path override.
481    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
482    pub db: Option<String>,
483
484    /// G30: poll for the job singleton every second for up to N seconds
485    /// when another invocation holds the lock. Default: 0 (fail fast).
486    #[arg(long, value_name = "SECONDS")]
487    pub wait_job_singleton: Option<u64>,
488
489    /// G30: force acquisition of the singleton lock by removing a stale
490    /// lock file from a previously crashed invocation. Use only when you
491    /// are certain no other `enrich`/`ingest` is running.
492    #[arg(long, default_value_t = false)]
493    pub force_job_singleton: bool,
494
495    /// G37: select a specific subset of memory names to enrich instead of
496    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
497    /// Empty when omitted (processes all candidates).
498    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
499    pub names: Vec<String>,
500
501    /// G37: read the subset of memory names from a file (one per line).
502    /// Lines starting with `#` and empty lines are ignored. Combined with
503    /// `--names` (union) when both are set.
504    #[arg(long, value_name = "PATH")]
505    pub names_file: Option<PathBuf>,
506
507    /// G35: probe the LLM provider with a 1-turn ping before processing
508    /// the batch. Aborts with a clear error if the rate-limit window is
509    /// closed (avoids burning N turns only to fail on item 1).
510    #[arg(long, default_value_t = false)]
511    pub preflight_check: bool,
512
513    /// G35: if a preflight probe or in-flight call hits the Claude rate
514    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
515    /// of failing the batch. Ignored when `--mode` is already `codex`.
516    #[arg(long, value_enum)]
517    pub fallback_mode: Option<EnrichMode>,
518
519    /// G35: number of seconds before the OAuth rate-limit reset at which
520    /// the preflight probe should refuse to start. Default 300 (5 min).
521    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
522    pub rate_limit_buffer: u64,
523
524    /// G28-D: refuse to start when the 1-minute load average exceeds
525    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
526    /// Set to false to skip the check on contended CI runners.
527    #[arg(long, default_value_t = true)]
528    pub max_load_check: bool,
529
530    /// G28-D: when the system is saturated, abort the job after this
531    /// many consecutive HardFailure outcomes. Default 5.
532    #[arg(long, value_name = "N", default_value_t = 5)]
533    pub circuit_breaker_threshold: u32,
534
535    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
536    /// original body and the LLM-rewritten body for the rewrite to be
537    /// accepted. Scores below the threshold are rejected and emitted as
538    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
539    /// gap specification). Ignored when `--operation` is not
540    /// `body-enrich`.
541    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
542    pub preserve_threshold: f64,
543
544    /// G33 Passo 3: when set, validate `--codex-model` against the
545    /// ChatGPT Pro OAuth accepted-model list and abort with a
546    /// suggestion when the value is unknown. Default true (fail fast
547    /// to avoid burning OAuth turns). Set to false to opt out.
548    #[arg(long, default_value_t = true)]
549    pub codex_model_validate: bool,
550
551    /// G33 Passo 3: when set together with an invalid `--codex-model`,
552    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
553    /// instead of aborting. The substitution is recorded in the NDJSON
554    /// stream as `provider_substituted: true` for traceability.
555    #[arg(long, value_name = "MODEL")]
556    pub codex_model_fallback: Option<String>,
557}
558
559// ---------------------------------------------------------------------------
560// Internal types — raw LLM output structs
561// ---------------------------------------------------------------------------
562
563// ---------------------------------------------------------------------------
564// NDJSON event types emitted to stdout
565// ---------------------------------------------------------------------------
566
567#[derive(Debug, Serialize)]
568struct PhaseEvent<'a> {
569    phase: &'a str,
570    #[serde(skip_serializing_if = "Option::is_none")]
571    binary_path: Option<&'a str>,
572    #[serde(skip_serializing_if = "Option::is_none")]
573    version: Option<&'a str>,
574    #[serde(skip_serializing_if = "Option::is_none")]
575    items_total: Option<usize>,
576    #[serde(skip_serializing_if = "Option::is_none")]
577    items_pending: Option<usize>,
578    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
579    #[serde(skip_serializing_if = "Option::is_none")]
580    llm_parallelism: Option<u32>,
581}
582
583#[derive(Debug, Serialize)]
584struct ItemEvent<'a> {
585    /// Item identifier (memory name or entity name).
586    item: &'a str,
587    status: &'a str,
588    #[serde(skip_serializing_if = "Option::is_none")]
589    memory_id: Option<i64>,
590    #[serde(skip_serializing_if = "Option::is_none")]
591    entity_id: Option<i64>,
592    #[serde(skip_serializing_if = "Option::is_none")]
593    entities: Option<usize>,
594    #[serde(skip_serializing_if = "Option::is_none")]
595    rels: Option<usize>,
596    #[serde(skip_serializing_if = "Option::is_none")]
597    chars_before: Option<usize>,
598    #[serde(skip_serializing_if = "Option::is_none")]
599    chars_after: Option<usize>,
600    #[serde(skip_serializing_if = "Option::is_none")]
601    cost_usd: Option<f64>,
602    #[serde(skip_serializing_if = "Option::is_none")]
603    elapsed_ms: Option<u64>,
604    #[serde(skip_serializing_if = "Option::is_none")]
605    error: Option<String>,
606    index: usize,
607    total: usize,
608}
609
610#[derive(Debug, Serialize)]
611struct EnrichSummary {
612    summary: bool,
613    operation: String,
614    items_total: usize,
615    completed: usize,
616    failed: usize,
617    skipped: usize,
618    cost_usd: f64,
619    elapsed_ms: u64,
620    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
621    /// executou o re-embedding durante o enrich. `"claude" | "codex" | "none"`.
622    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
623    /// ou quando a operação não envolveu re-embed).
624    #[serde(skip_serializing_if = "Option::is_none")]
625    backend_invoked: Option<&'static str>,
626}
627
628use crate::output::emit_json_line as emit_json;
629
630// ---------------------------------------------------------------------------
631// Queue DB
632// ---------------------------------------------------------------------------
633
634/// Opens or creates the enrichment queue database.
635///
636/// The queue schema mirrors `ingest_claude` for resume/retry parity.
637/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
638///
639/// # DRY note
640///
641/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
642/// Both should be unified in a shared `llm_runner.rs` module by the
643/// Integration stream.
644fn open_queue_db(path: &str) -> Result<Connection, AppError> {
645    let conn = Connection::open(path)?;
646    conn.pragma_update(None, "journal_mode", "wal")?;
647    conn.execute_batch(
648        "CREATE TABLE IF NOT EXISTS queue (
649            id          INTEGER PRIMARY KEY AUTOINCREMENT,
650            item_key    TEXT NOT NULL UNIQUE,
651            item_type   TEXT NOT NULL DEFAULT 'memory',
652            status      TEXT NOT NULL DEFAULT 'pending',
653            memory_id   INTEGER,
654            entity_id   INTEGER,
655            entities    INTEGER DEFAULT 0,
656            rels        INTEGER DEFAULT 0,
657            error       TEXT,
658            cost_usd    REAL DEFAULT 0.0,
659            attempt     INTEGER DEFAULT 0,
660            elapsed_ms  INTEGER,
661            created_at  TEXT DEFAULT (datetime('now')),
662            done_at     TEXT
663        );
664        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
665    )?;
666    Ok(conn)
667}
668
669// ---------------------------------------------------------------------------
670// LLM invocation — Claude Code
671// ---------------------------------------------------------------------------
672
673/// Calls `claude -p` via the shared `claude_runner` module (G02).
674///
675/// Returns `(output_value, cost_usd, is_oauth)`.
676fn call_claude(
677    binary: &Path,
678    prompt: &str,
679    json_schema: &str,
680    input_text: &str,
681    model: Option<&str>,
682    timeout_secs: u64,
683) -> Result<(serde_json::Value, f64, bool), AppError> {
684    let result = crate::commands::claude_runner::run_claude(
685        binary,
686        prompt,
687        json_schema,
688        input_text,
689        model,
690        timeout_secs,
691        7,
692    )?;
693    Ok((result.value, result.cost_usd, result.is_oauth))
694}
695
696// ---------------------------------------------------------------------------
697// Preflight probe (G35) — single-turn ping to verify the LLM provider
698// ---------------------------------------------------------------------------
699
700/// Result of a single preflight ping (G35).
701enum PreflightOutcome {
702    /// The provider accepted the ping without rate-limit or other errors.
703    Healthy,
704    /// The provider rejected the ping due to OAuth rate limit. The
705    /// `suggestion` field is a human hint that callers can embed in the
706    /// user-facing error.
707    RateLimited {
708        reason: String,
709        suggestion: &'static str,
710    },
711    /// Any other provider error (binary missing, auth failure, etc.).
712    Error(AppError),
713}
714
715/// Probes the configured LLM provider with a 1-turn ping.
716///
717/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
718/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
719///
720/// The probe intentionally avoids spawning any MCP server children (G28-A)
721/// to keep its own process footprint at the minimum.
722fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
723    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
724
725    match args.mode {
726        EnrichMode::ClaudeCode => {
727            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
728                Ok(b) => b,
729                Err(e) => return PreflightOutcome::Error(e),
730            };
731            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
732            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
733            // form) and run the preflight gate before spawn, mirroring
734            // the canonical pattern in `claude_runner::build_claude_command`.
735            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
736                Ok(p) => p,
737                Err(e) => {
738                    return PreflightOutcome::Error(AppError::Io(e));
739                }
740            };
741            let mut cmd = std::process::Command::new(&bin);
742            crate::spawn::env_whitelist::apply_env_whitelist(
743                &mut cmd,
744                crate::spawn::env_whitelist::is_strict_env_clear(),
745            );
746            if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
747                return PreflightOutcome::Error(e);
748            }
749            cmd.arg("-p")
750                .arg("ping")
751                .arg("--max-turns")
752                .arg("1")
753                .arg("--strict-mcp-config")
754                .arg("--mcp-config")
755                .arg(mcp_config_path.as_os_str())
756                .arg("--dangerously-skip-permissions")
757                .arg("--settings")
758                .arg("{\"hooks\":{}}")
759                .arg("--output-format")
760                .arg("json")
761                .stdin(std::process::Stdio::null())
762                .stdout(std::process::Stdio::piped())
763                .stderr(std::process::Stdio::piped());
764
765            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
766                Ok(c) => c,
767                Err(e) => {
768                    return PreflightOutcome::Error(AppError::Io(e));
769                }
770            };
771            let output = match wait_with_timeout(child, timeout) {
772                Ok(out) => out,
773                Err(e) => return PreflightOutcome::Error(e),
774            };
775            if !output.status.success() {
776                let stderr = String::from_utf8_lossy(&output.stderr);
777                if stderr.contains("hit your session limit")
778                    || stderr.contains("rate_limit")
779                    || stderr.contains("429")
780                {
781                    return PreflightOutcome::RateLimited {
782                        reason: stderr.trim().to_string(),
783                        suggestion:
784                            "wait for the OAuth window to reset or use --fallback-mode codex",
785                    };
786                }
787                return PreflightOutcome::Error(AppError::Validation(format!(
788                    "preflight probe failed: {stderr}",
789                    stderr = stderr.trim()
790                )));
791            }
792            PreflightOutcome::Healthy
793        }
794        EnrichMode::Codex => {
795            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
796                Ok(b) => b,
797                Err(e) => return PreflightOutcome::Error(e),
798            };
799            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
800                .map_err(PreflightOutcome::Error)
801                .ok();
802            let schema = "{}";
803            let schema_path = match super::codex_spawn::trusted_schema_path() {
804                Ok(p) => p,
805                Err(e) => return PreflightOutcome::Error(e),
806            };
807            let spawn_args = super::codex_spawn::CodexSpawnArgs {
808                binary: &bin,
809                prompt: "ping",
810                json_schema: schema,
811                input_text: "",
812                model: args.codex_model.as_deref(),
813                timeout_secs: args.rate_limit_buffer.max(60),
814                schema_path: schema_path.clone(),
815            };
816            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
817                Ok(c) => c,
818                Err(e) => return PreflightOutcome::Error(e),
819            };
820            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
821                Ok(c) => c,
822                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
823            };
824            let output = match wait_with_timeout(child, timeout) {
825                Ok(out) => out,
826                Err(e) => return PreflightOutcome::Error(e),
827            };
828            let _ = std::fs::remove_file(&schema_path);
829            if !output.status.success() {
830                let stderr = String::from_utf8_lossy(&output.stderr);
831                if stderr.contains("rate_limit")
832                    || stderr.contains("429")
833                    || stderr.contains("Too Many Requests")
834                {
835                    return PreflightOutcome::RateLimited {
836                        reason: stderr.trim().to_string(),
837                        suggestion: "wait for the rate-limit window to reset",
838                    };
839                }
840                return PreflightOutcome::Error(AppError::Validation(format!(
841                    "preflight probe failed: {stderr}",
842                    stderr = stderr.trim()
843                )));
844            }
845            PreflightOutcome::Healthy
846        }
847        EnrichMode::Opencode => {
848            let bin = match super::opencode_runner::find_opencode_binary_with_override(
849                args.opencode_binary.as_deref(),
850            ) {
851                Ok(b) => b,
852                Err(e) => return PreflightOutcome::Error(e),
853            };
854            let model =
855                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
856            let mut cmd =
857                match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
858                {
859                    Ok(c) => c,
860                    Err(e) => return PreflightOutcome::Error(e),
861                };
862            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
863                Ok(c) => c,
864                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
865            };
866            let output = match wait_with_timeout(child, timeout) {
867                Ok(out) => out,
868                Err(e) => return PreflightOutcome::Error(e),
869            };
870            if !output.status.success() {
871                let stderr = String::from_utf8_lossy(&output.stderr);
872                if stderr.contains("rate_limit")
873                    || stderr.contains("429")
874                    || stderr.contains("Too Many Requests")
875                {
876                    return PreflightOutcome::RateLimited {
877                        reason: stderr.trim().to_string(),
878                        suggestion: "wait for the rate-limit window to reset",
879                    };
880                }
881                return PreflightOutcome::Error(AppError::Validation(format!(
882                    "preflight probe failed: {stderr}",
883                    stderr = stderr.trim()
884                )));
885            }
886            PreflightOutcome::Healthy
887        }
888    }
889}
890
891/// Cross-platform wait with timeout (no extra crate dependency).
892fn wait_with_timeout(
893    mut child: std::process::Child,
894    timeout: std::time::Duration,
895) -> Result<std::process::Output, AppError> {
896    use wait_timeout::ChildExt;
897    let start = std::time::Instant::now();
898    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
899    if status.is_none() {
900        let _ = child.kill();
901        let _ = child.wait();
902        return Err(AppError::Validation(format!(
903            "preflight probe timed out after {}s",
904            start.elapsed().as_secs()
905        )));
906    }
907    let mut stdout = Vec::new();
908    if let Some(mut out) = child.stdout.take() {
909        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
910    }
911    let mut stderr = Vec::new();
912    if let Some(mut err) = child.stderr.take() {
913        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
914    }
915    let exit = status.unwrap();
916    Ok(std::process::Output {
917        status: exit,
918        stdout,
919        stderr,
920    })
921}
922
923// ---------------------------------------------------------------------------
924// SCAN helpers — SQL queries that find items needing enrichment
925// ---------------------------------------------------------------------------
926
927/// Returns memories without any `memory_entities` binding.
928///
929/// These are the targets for `memory-bindings` enrichment. When `name_filter`
930/// is non-empty, restricts the scan to the given names (G37); unknown names
931/// are silently skipped (the caller can detect them by comparing
932/// requested vs. returned).
933fn scan_unbound_memories(
934    conn: &Connection,
935    namespace: &str,
936    limit: Option<usize>,
937    name_filter: &[String],
938) -> Result<Vec<(i64, String, String)>, AppError> {
939    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
940
941    if name_filter.is_empty() {
942        let sql = format!(
943            "SELECT m.id, m.name, m.body
944             FROM memories m
945             WHERE m.namespace = ?1
946               AND m.deleted_at IS NULL
947               AND NOT EXISTS (
948                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
949               )
950             ORDER BY m.id
951             {limit_clause}"
952        );
953        let mut stmt = conn.prepare(&sql)?;
954        let rows = stmt
955            .query_map(rusqlite::params![namespace], |r| {
956                Ok((
957                    r.get::<_, i64>(0)?,
958                    r.get::<_, String>(1)?,
959                    r.get::<_, String>(2)?,
960                ))
961            })?
962            .collect::<Result<Vec<_>, _>>()?;
963        Ok(rows)
964    } else {
965        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
966        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
967            .map(|i| format!("?{i}"))
968            .collect();
969        let in_clause = placeholders.join(", ");
970        let sql = format!(
971            "SELECT m.id, m.name, m.body
972             FROM memories m
973             WHERE m.namespace = ?1
974               AND m.deleted_at IS NULL
975               AND m.name IN ({in_clause})
976               AND NOT EXISTS (
977                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
978               )
979             ORDER BY m.id
980             {limit_clause}"
981        );
982        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
983        params_vec.push(&namespace);
984        for n in name_filter {
985            params_vec.push(n);
986        }
987        let mut stmt = conn.prepare(&sql)?;
988        let rows = stmt
989            .query_map(
990                rusqlite::params_from_iter(params_vec.iter().copied()),
991                |r| {
992                    Ok((
993                        r.get::<_, i64>(0)?,
994                        r.get::<_, String>(1)?,
995                        r.get::<_, String>(2)?,
996                    ))
997                },
998            )?
999            .collect::<Result<Vec<_>, _>>()?;
1000        Ok(rows)
1001    }
1002}
1003
1004/// Reads a list of memory names from a UTF-8 text file (G37).
1005///
1006/// Empty lines and lines beginning with `#` are skipped. Returns a
1007/// de-duplicated, order-preserving list of trimmed names.
1008fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1009    let content = std::fs::read_to_string(path).map_err(|e| {
1010        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1011    })?;
1012    let mut seen = std::collections::HashSet::new();
1013    let mut out = Vec::new();
1014    for line in content.lines() {
1015        let trimmed = line.trim();
1016        if trimmed.is_empty() || trimmed.starts_with('#') {
1017            continue;
1018        }
1019        if seen.insert(trimmed.to_string()) {
1020            out.push(trimmed.to_string());
1021        }
1022    }
1023    Ok(out)
1024}
1025
1026/// Resolves the union of `--names` and `--names-file` (G37).
1027fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1028    let mut combined: Vec<String> = args.names.clone();
1029    if let Some(p) = &args.names_file {
1030        let from_file = read_names_file(p)?;
1031        for n in from_file {
1032            if !combined.contains(&n) {
1033                combined.push(n);
1034            }
1035        }
1036    }
1037    Ok(combined)
1038}
1039
1040/// Returns entities with NULL or empty description.
1041///
1042/// These are the targets for `entity-descriptions` enrichment.
1043fn scan_entities_without_description(
1044    conn: &Connection,
1045    namespace: &str,
1046    limit: Option<usize>,
1047    name_filter: &[String],
1048) -> Result<Vec<(i64, String, String)>, AppError> {
1049    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1050
1051    if name_filter.is_empty() {
1052        let sql = format!(
1053            "SELECT id, name, type
1054             FROM entities
1055             WHERE namespace = ?1
1056               AND (description IS NULL OR description = '')
1057             ORDER BY id
1058             {limit_clause}"
1059        );
1060        let mut stmt = conn.prepare(&sql)?;
1061        let rows = stmt
1062            .query_map(rusqlite::params![namespace], |r| {
1063                Ok((
1064                    r.get::<_, i64>(0)?,
1065                    r.get::<_, String>(1)?,
1066                    r.get::<_, String>(2)?,
1067                ))
1068            })?
1069            .collect::<Result<Vec<_>, _>>()?;
1070        Ok(rows)
1071    } else {
1072        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1073            .map(|i| format!("?{i}"))
1074            .collect();
1075        let in_clause = placeholders.join(", ");
1076        let sql = format!(
1077            "SELECT id, name, type
1078             FROM entities
1079             WHERE namespace = ?1
1080               AND name IN ({in_clause})
1081               AND (description IS NULL OR description = '')
1082             ORDER BY id
1083             {limit_clause}"
1084        );
1085        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1086        params_vec.push(&namespace);
1087        for n in name_filter {
1088            params_vec.push(n);
1089        }
1090        let mut stmt = conn.prepare(&sql)?;
1091        let rows = stmt
1092            .query_map(
1093                rusqlite::params_from_iter(params_vec.iter().copied()),
1094                |r| {
1095                    Ok((
1096                        r.get::<_, i64>(0)?,
1097                        r.get::<_, String>(1)?,
1098                        r.get::<_, String>(2)?,
1099                    ))
1100                },
1101            )?
1102            .collect::<Result<Vec<_>, _>>()?;
1103        Ok(rows)
1104    }
1105}
1106
1107/// Returns memories whose body length is below the configured minimum.
1108///
1109/// These are the targets for `body-enrich` (GAP-18).
1110fn scan_short_body_memories(
1111    conn: &Connection,
1112    namespace: &str,
1113    min_chars: usize,
1114    limit: Option<usize>,
1115    name_filter: &[String],
1116) -> Result<Vec<(i64, String, String)>, AppError> {
1117    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1118
1119    if name_filter.is_empty() {
1120        let sql = format!(
1121            "SELECT m.id, m.name, m.body
1122             FROM memories m
1123             WHERE m.namespace = ?1
1124               AND m.deleted_at IS NULL
1125               AND LENGTH(COALESCE(m.body,'')) < ?2
1126             ORDER BY m.id
1127             {limit_clause}"
1128        );
1129        let mut stmt = conn.prepare(&sql)?;
1130        let rows = stmt
1131            .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1132                Ok((
1133                    r.get::<_, i64>(0)?,
1134                    r.get::<_, String>(1)?,
1135                    r.get::<_, String>(2)?,
1136                ))
1137            })?
1138            .collect::<Result<Vec<_>, _>>()?;
1139        Ok(rows)
1140    } else {
1141        let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1142            .map(|i| format!("?{i}"))
1143            .collect();
1144        let in_clause = placeholders.join(", ");
1145        let sql = format!(
1146            "SELECT m.id, m.name, m.body
1147             FROM memories m
1148             WHERE m.namespace = ?1
1149               AND m.deleted_at IS NULL
1150               AND m.name IN ({in_clause})
1151               AND LENGTH(COALESCE(m.body,'')) < ?2
1152             ORDER BY m.id
1153             {limit_clause}"
1154        );
1155        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1156        let min_chars_i64 = min_chars as i64;
1157        params_vec.push(&namespace);
1158        params_vec.push(&min_chars_i64);
1159        for n in name_filter {
1160            params_vec.push(n);
1161        }
1162        let mut stmt = conn.prepare(&sql)?;
1163        let rows = stmt
1164            .query_map(
1165                rusqlite::params_from_iter(params_vec.iter().copied()),
1166                |r| {
1167                    Ok((
1168                        r.get::<_, i64>(0)?,
1169                        r.get::<_, String>(1)?,
1170                        r.get::<_, String>(2)?,
1171                    ))
1172                },
1173            )?
1174            .collect::<Result<Vec<_>, _>>()?;
1175        Ok(rows)
1176    }
1177}
1178
1179/// Returns live memories that still have no row in `memory_embeddings`.
1180///
1181/// These are the targets for `re-embed`.
1182fn scan_memories_without_embeddings(
1183    conn: &Connection,
1184    namespace: &str,
1185    limit: Option<usize>,
1186    name_filter: &[String],
1187) -> Result<Vec<(i64, String, String)>, AppError> {
1188    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1189
1190    if name_filter.is_empty() {
1191        let sql = format!(
1192            "SELECT m.id, m.name, COALESCE(m.body,'')
1193             FROM memories m
1194             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1195             WHERE m.namespace = ?1
1196               AND m.deleted_at IS NULL
1197               AND me.memory_id IS NULL
1198             ORDER BY m.id
1199             {limit_clause}"
1200        );
1201        let mut stmt = conn.prepare(&sql)?;
1202        let rows = stmt
1203            .query_map(rusqlite::params![namespace], |r| {
1204                Ok((
1205                    r.get::<_, i64>(0)?,
1206                    r.get::<_, String>(1)?,
1207                    r.get::<_, String>(2)?,
1208                ))
1209            })?
1210            .collect::<Result<Vec<_>, _>>()?;
1211        Ok(rows)
1212    } else {
1213        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1214            .map(|i| format!("?{i}"))
1215            .collect();
1216        let in_clause = placeholders.join(", ");
1217        let sql = format!(
1218            "SELECT m.id, m.name, COALESCE(m.body,'')
1219             FROM memories m
1220             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1221             WHERE m.namespace = ?1
1222               AND m.deleted_at IS NULL
1223               AND m.name IN ({in_clause})
1224               AND me.memory_id IS NULL
1225             ORDER BY m.id
1226             {limit_clause}"
1227        );
1228        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1229        params_vec.push(&namespace);
1230        for n in name_filter {
1231            params_vec.push(n);
1232        }
1233        let mut stmt = conn.prepare(&sql)?;
1234        let rows = stmt
1235            .query_map(
1236                rusqlite::params_from_iter(params_vec.iter().copied()),
1237                |r| {
1238                    Ok((
1239                        r.get::<_, i64>(0)?,
1240                        r.get::<_, String>(1)?,
1241                        r.get::<_, String>(2)?,
1242                    ))
1243                },
1244            )?
1245            .collect::<Result<Vec<_>, _>>()?;
1246        Ok(rows)
1247    }
1248}
1249
1250/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1251#[allow(clippy::type_complexity)]
1252fn scan_weight_candidates(
1253    conn: &Connection,
1254    namespace: &str,
1255    limit: Option<usize>,
1256) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1257    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1258    let sql = format!(
1259        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1260         FROM relationships r \
1261         JOIN entities e1 ON e1.id = r.source_id \
1262         JOIN entities e2 ON e2.id = r.target_id \
1263         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1264         ORDER BY r.weight DESC {limit_clause}"
1265    );
1266    let mut stmt = conn.prepare(&sql)?;
1267    let rows = stmt
1268        .query_map(rusqlite::params![namespace], |r| {
1269            Ok((
1270                r.get::<_, i64>(0)?,
1271                r.get::<_, String>(1)?,
1272                r.get::<_, String>(2)?,
1273                r.get::<_, String>(3)?,
1274                r.get::<_, f64>(4)?,
1275            ))
1276        })?
1277        .collect::<Result<Vec<_>, _>>()?;
1278    Ok(rows)
1279}
1280
1281/// G27: Returns relationships with generic relation types (applies_to).
1282fn scan_generic_relations(
1283    conn: &Connection,
1284    namespace: &str,
1285    limit: Option<usize>,
1286) -> Result<Vec<(i64, String, String, String)>, AppError> {
1287    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1288    let sql = format!(
1289        "SELECT r.id, e1.name, e2.name, r.relation \
1290         FROM relationships r \
1291         JOIN entities e1 ON e1.id = r.source_id \
1292         JOIN entities e2 ON e2.id = r.target_id \
1293         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1294         ORDER BY r.id {limit_clause}"
1295    );
1296    let mut stmt = conn.prepare(&sql)?;
1297    let rows = stmt
1298        .query_map(rusqlite::params![namespace], |r| {
1299            Ok((
1300                r.get::<_, i64>(0)?,
1301                r.get::<_, String>(1)?,
1302                r.get::<_, String>(2)?,
1303                r.get::<_, String>(3)?,
1304            ))
1305        })?
1306        .collect::<Result<Vec<_>, _>>()?;
1307    Ok(rows)
1308}
1309
1310// ---------------------------------------------------------------------------
1311// PERSIST helpers for fully-implemented operations
1312// ---------------------------------------------------------------------------
1313
1314/// Persists entity bindings extracted by the LLM for a memory.
1315///
1316/// Creates entities via `upsert_entity`, links them to the memory via
1317/// `link_memory_entity`, and upserts relationships found between entities.
1318fn persist_memory_bindings(
1319    conn: &Connection,
1320    namespace: &str,
1321    memory_id: i64,
1322    entities_json: &serde_json::Value,
1323    rels_json: &serde_json::Value,
1324) -> Result<(usize, usize), AppError> {
1325    #[derive(Deserialize)]
1326    struct EntityItem {
1327        name: String,
1328        entity_type: String,
1329    }
1330    #[derive(Deserialize)]
1331    struct RelItem {
1332        source: String,
1333        target: String,
1334        relation: String,
1335        strength: f64,
1336    }
1337
1338    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1339        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1340
1341    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1342        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1343
1344    let mut ent_count = 0usize;
1345    let mut rel_count = 0usize;
1346
1347    for item in &extracted_entities {
1348        let entity_type = match item.entity_type.parse::<EntityType>() {
1349            Ok(et) => et,
1350            Err(_) => {
1351                tracing::warn!(
1352                    target: "enrich",
1353                    entity = %item.name,
1354                    entity_type = %item.entity_type,
1355                    "entity type not recognized, skipping"
1356                );
1357                continue;
1358            }
1359        };
1360        match entities::upsert_entity(
1361            conn,
1362            namespace,
1363            &NewEntity {
1364                name: item.name.clone(),
1365                entity_type,
1366                description: None,
1367            },
1368        ) {
1369            Ok(eid) => {
1370                let _ = entities::link_memory_entity(conn, memory_id, eid);
1371                ent_count += 1;
1372            }
1373            Err(e) => {
1374                tracing::warn!(
1375                    target: "enrich",
1376                    entity = %item.name,
1377                    error = %e,
1378                    "entity upsert skipped"
1379                );
1380            }
1381        }
1382    }
1383
1384    for rel in &extracted_rels {
1385        let normalized = crate::parsers::normalize_relation(&rel.relation);
1386        crate::parsers::warn_if_non_canonical(&normalized);
1387
1388        // Normalize entity names before lookup: upsert_entity normalizes on write,
1389        // so the lookup must use the same normalized form to find the row.
1390        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1391        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1392        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1393        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1394        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1395            let new_rel = NewRelationship {
1396                source: rel.source.clone(),
1397                target: rel.target.clone(),
1398                relation: normalized,
1399                strength: rel.strength,
1400                description: None,
1401            };
1402            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1403                rel_count += 1;
1404            }
1405        }
1406    }
1407
1408    Ok((ent_count, rel_count))
1409}
1410
1411/// Updates an entity's description directly in the `entities` table.
1412fn persist_entity_description(
1413    conn: &Connection,
1414    entity_id: i64,
1415    description: &str,
1416) -> Result<(), AppError> {
1417    conn.execute(
1418        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1419        rusqlite::params![description, entity_id],
1420    )?;
1421    Ok(())
1422}
1423
1424/// v1.0.84 (ADR-0042): on successful re-embed, records the active backend
1425/// into the shared accumulator (`ENRICH_LAST_BACKEND`) so the final
1426/// `EnrichSummary` can expose `backend_invoked` without changing every
1427/// caller's signature. Best-effort observability — concurrent enrich runs
1428/// may race, but `Mutex` keeps the mutation safe.
1429#[allow(clippy::too_many_arguments)]
1430fn reembed_memory_vector(
1431    conn: &Connection,
1432    namespace: &str,
1433    memory_id: i64,
1434    memory_name: &str,
1435    memory_type: &str,
1436    body: &str,
1437    paths: &crate::paths::AppPaths,
1438    llm_backend: crate::cli::LlmBackendChoice,
1439) -> Result<(), AppError> {
1440    let snippet: String = body.chars().take(200).collect();
1441    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
1442    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
1443    // backend que efetivamente rodou e popula o accumulator para o
1444    // EnrichSummary agregado.
1445    let (embedding, backend_kind) =
1446        crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1447    record_enrich_backend(backend_kind.as_str());
1448    memories::upsert_vec(
1449        conn,
1450        memory_id,
1451        namespace,
1452        memory_type,
1453        &embedding,
1454        memory_name,
1455        &snippet,
1456    )?;
1457    Ok(())
1458}
1459
1460/// v1.0.84 (ADR-0042): process-local accumulator of the last LLM backend
1461/// that successfully ran a re-embed during the current enrich invocation.
1462/// Read by `run` once at summary emission. Scoped to a single process —
1463/// cross-process enrichment is gated by the per-namespace singleton, so
1464/// there is no concurrency hazard across DBs.
1465fn record_enrich_backend(backend: &'static str) {
1466    if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1467        *guard = Some(backend);
1468    }
1469}
1470
1471fn take_enrich_backend() -> Option<&'static str> {
1472    ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1473}
1474
1475static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1476
1477/// Persists an enriched memory body (body-enrich, GAP-18).
1478///
1479/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1480/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1481fn persist_enriched_body(
1482    conn: &Connection,
1483    namespace: &str,
1484    memory_id: i64,
1485    memory_name: &str,
1486    new_body: &str,
1487    paths: &crate::paths::AppPaths,
1488    llm_backend: crate::cli::LlmBackendChoice,
1489) -> Result<(), AppError> {
1490    // Read current values for FTS sync
1491    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1492        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1493        rusqlite::params![memory_id],
1494        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1495    )?;
1496
1497    let memory_type: String = conn.query_row(
1498        "SELECT type FROM memories WHERE id=?1",
1499        rusqlite::params![memory_id],
1500        |r| r.get(0),
1501    )?;
1502
1503    let description: String = conn.query_row(
1504        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1505        rusqlite::params![memory_id],
1506        |r| r.get(0),
1507    )?;
1508
1509    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1510
1511    let new_memory = memories::NewMemory {
1512        namespace: namespace.to_string(),
1513        name: memory_name.to_string(),
1514        memory_type: memory_type.clone(),
1515        description: description.clone(),
1516        body: new_body.to_string(),
1517        body_hash,
1518        session_id: None,
1519        source: "agent".to_string(),
1520        metadata: serde_json::json!({
1521            "operation": "body-enrich",
1522            "orig_chars": old_body.chars().count(),
1523            "new_chars": new_body.chars().count(),
1524        }),
1525    };
1526
1527    // G29 audit: insert a new immutable version BEFORE the update so the
1528    // enriched body is reachable through `history --name <X>` and
1529    // `restore --version N` can roll back to the pre-enrich state.
1530    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1531    let version_metadata = serde_json::json!({
1532        "operation": "body-enrich",
1533        "orig_chars": old_body.chars().count(),
1534        "new_chars": new_body.chars().count(),
1535    })
1536    .to_string();
1537    crate::storage::versions::insert_version(
1538        conn,
1539        memory_id,
1540        next_version,
1541        memory_name,
1542        &memory_type,
1543        &description,
1544        new_body,
1545        &version_metadata,
1546        Some("enrich"),
1547        "edit",
1548    )?;
1549
1550    memories::update(conn, memory_id, &new_memory, None)?;
1551    memories::sync_fts_after_update(
1552        conn,
1553        memory_id,
1554        &old_name,
1555        &old_desc,
1556        &old_body,
1557        &new_memory.name,
1558        &new_memory.description,
1559        &new_memory.body,
1560    )?;
1561
1562    // Re-embed for recall accuracy
1563    if let Err(e) = reembed_memory_vector(
1564        conn,
1565        namespace,
1566        memory_id,
1567        memory_name,
1568        &memory_type,
1569        new_body,
1570        paths,
1571        llm_backend,
1572    ) {
1573        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1574    }
1575
1576    Ok(())
1577}
1578
1579// ---------------------------------------------------------------------------
1580// Main entry point
1581// ---------------------------------------------------------------------------
1582
1583// ---------------------------------------------------------------------------
1584// G20: mode-conditional flag validation
1585// ---------------------------------------------------------------------------
1586
1587/// True when a scalar value matches its declared default. Used to
1588/// distinguish "operator passed an explicit override" from "clap filled
1589/// the default" for flags with default_value_t.
1590fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1591    value == default
1592}
1593
1594/// G20: validate that flags for one LLM provider were not passed when
1595/// the operator selected a different provider. Flags silently discarded
1596/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1597/// DB work, so the operator gets an actionable error instead of a
1598/// surprise at runtime.
1599///
1600/// Detection rules:
1601/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1602/// - For scalar fields with default_value_t: value != default means explicit
1603/// - For boolean fields: true means explicit (default is false)
1604///
1605/// Mode-specific matrices:
1606/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1607/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1608fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1609    const DEFAULT_TIMEOUT: u64 = 300;
1610
1611    let mut conflicts: Vec<String> = Vec::new();
1612
1613    match args.mode {
1614        EnrichMode::ClaudeCode => {
1615            if args.codex_binary.is_some() {
1616                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1617            }
1618            if args.codex_model.is_some() {
1619                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1620            }
1621            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1622                conflicts.push(format!(
1623                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1624                    args.codex_timeout
1625                ));
1626            }
1627        }
1628        EnrichMode::Codex => {
1629            if args.claude_binary.is_some() {
1630                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1631            }
1632            if args.claude_model.is_some() {
1633                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1634            }
1635            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1636                conflicts.push(format!(
1637                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1638                    args.claude_timeout
1639                ));
1640            }
1641            if args.max_cost_usd.is_some() {
1642                conflicts.push(
1643                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1644                        .to_string(),
1645                );
1646            }
1647        }
1648        EnrichMode::Opencode => {
1649            if args.claude_binary.is_some() {
1650                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1651            }
1652            if args.claude_model.is_some() {
1653                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1654            }
1655            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1656                conflicts.push(format!(
1657                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1658                    args.claude_timeout
1659                ));
1660            }
1661            if args.max_cost_usd.is_some() {
1662                conflicts.push(
1663                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1664                        .to_string(),
1665                );
1666            }
1667        }
1668    }
1669
1670    if !conflicts.is_empty() {
1671        return Err(AppError::Validation(format!(
1672            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1673            args.mode,
1674            conflicts.join("\n  - ")
1675        )));
1676    }
1677
1678    Ok(())
1679}
1680
1681// ---------------------------------------------------------------------------
1682
1683/// Main entry point for the `enrich` command.
1684pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1685    // G20: mode-conditional flag validation BEFORE any DB access.
1686    // Surfaces flags that the wrong mode would silently discard.
1687    validate_mode_conditional_flags_enrich(args)?;
1688    let started = Instant::now();
1689
1690    let paths = AppPaths::resolve(args.db.as_deref())?;
1691    ensure_db_ready(&paths)?;
1692    let conn = open_rw(&paths.db)?;
1693    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1694
1695    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1696    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1697    // on the same DB cannot co-exist, but concurrent enrich on different
1698    // databases works as expected. The force flag (--force) breaks a
1699    // stale lock from a previously crashed invocation.
1700    let wait_secs = args.wait_job_singleton;
1701    let force_flag = args.force_job_singleton;
1702    let _singleton = crate::lock::acquire_job_singleton(
1703        crate::lock::JobType::Enrich,
1704        &namespace,
1705        &paths.db,
1706        wait_secs,
1707        force_flag,
1708    )?;
1709
1710    // Validate provider binary upfront only for LLM-backed operations.
1711    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1712        None
1713    } else {
1714        Some(match args.mode {
1715            EnrichMode::ClaudeCode => {
1716                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1717                let version = super::claude_runner::validate_claude_version(&bin)?;
1718                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1719                emit_json(&PhaseEvent {
1720                    phase: "validate",
1721                    binary_path: bin.to_str(),
1722                    version: Some(&version),
1723                    items_total: None,
1724                    items_pending: None,
1725                    llm_parallelism: None,
1726                });
1727                bin
1728            }
1729            EnrichMode::Codex => {
1730                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1731                emit_json(&PhaseEvent {
1732                    phase: "validate",
1733                    binary_path: bin.to_str(),
1734                    version: None,
1735                    items_total: None,
1736                    items_pending: None,
1737                    llm_parallelism: None,
1738                });
1739                bin
1740            }
1741            EnrichMode::Opencode => {
1742                let bin = super::opencode_runner::find_opencode_binary_with_override(
1743                    args.opencode_binary.as_deref(),
1744                )?;
1745                emit_json(&PhaseEvent {
1746                    phase: "validate",
1747                    binary_path: bin.to_str(),
1748                    version: None,
1749                    items_total: None,
1750                    items_pending: None,
1751                    llm_parallelism: None,
1752                });
1753                bin
1754            }
1755        })
1756    };
1757
1758    // G28-D: refuse to start when the system is saturated. This check
1759    // is BEFORE preflight so we never spend an OAuth turn on a host
1760    // that is already at the limit.
1761    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1762        let load = crate::system_load::load_average_one();
1763        let n = crate::system_load::ncpus();
1764        return Err(AppError::Validation(format!(
1765            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1766             pass --no-max-load-check to override (not recommended)"
1767        )));
1768    }
1769
1770    // G35: preflight probe — issue a single ping turn to verify the
1771    // provider is healthy before scanning N candidates. If the probe
1772    // fails with a rate-limit error, optionally fall back to a
1773    // different mode (typically codex) instead of failing the entire
1774    // batch. The probe itself consumes 1 OAuth turn, so it stays
1775    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1776    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1777    {
1778        let preflight_result = run_preflight_probe(args);
1779        match preflight_result {
1780            PreflightOutcome::Healthy => {
1781                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1782            }
1783            PreflightOutcome::RateLimited { reason, suggestion } => {
1784                if let Some(fallback) = args.fallback_mode.clone() {
1785                    if fallback != args.mode {
1786                        // G35 (v1.0.69): the mid-batch mode switch is
1787                        // intentionally NOT applied because it would
1788                        // desynchronise the per-item rate-limit wait
1789                        // state (rate-limited items in the worker are
1790                        // timed against the original provider). Instead
1791                        // we abort cleanly so the operator can re-invoke
1792                        // with `--mode {fallback:?}`. This guarantees no
1793                        // OAuth window is wasted and no partial state
1794                        // is left in the queue.
1795                        return Err(AppError::Validation(format!(
1796                            "preflight detected rate limit on {mode:?}: {reason}; \
1797                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1798                            mode = args.mode
1799                        )));
1800                    }
1801                    return Err(AppError::Validation(format!(
1802                        "preflight detected rate limit on {mode:?}: {reason}; \
1803                         --fallback-mode matches --mode, no recovery possible",
1804                        mode = args.mode
1805                    )));
1806                }
1807                return Err(AppError::Validation(format!(
1808                    "preflight detected rate limit on {mode:?}: {reason}; \
1809                     {suggestion}; pass --fallback-mode codex to recover",
1810                    mode = args.mode
1811                )));
1812            }
1813            PreflightOutcome::Error(e) => {
1814                return Err(e);
1815            }
1816        }
1817    }
1818
1819    // SCAN phase
1820    let scan_result = scan_operation(&conn, &namespace, args)?;
1821    let total = scan_result.len();
1822
1823    emit_json(&PhaseEvent {
1824        phase: "scan",
1825        binary_path: None,
1826        version: None,
1827        items_total: Some(total),
1828        items_pending: Some(total),
1829        llm_parallelism: Some(args.llm_parallelism),
1830    });
1831
1832    // Dry-run: emit preview events and summary without calling LLM
1833    if args.dry_run {
1834        for (idx, key) in scan_result.iter().enumerate() {
1835            emit_json(&ItemEvent {
1836                item: key,
1837                status: "preview",
1838                memory_id: None,
1839                entity_id: None,
1840                entities: None,
1841                rels: None,
1842                chars_before: None,
1843                chars_after: None,
1844                cost_usd: None,
1845                elapsed_ms: None,
1846                error: None,
1847                index: idx,
1848                total,
1849            });
1850        }
1851        emit_json(&EnrichSummary {
1852            summary: true,
1853            operation: format!("{:?}", args.operation),
1854            items_total: total,
1855            completed: 0,
1856            failed: 0,
1857            skipped: 0,
1858            cost_usd: 0.0,
1859            elapsed_ms: started.elapsed().as_millis() as u64,
1860            backend_invoked: take_enrich_backend(),
1861        });
1862        return Ok(());
1863    }
1864
1865    // All operations in this enum have an execution path.
1866
1867    // Queue setup for resume/retry
1868    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1869
1870    if args.resume {
1871        let reset = queue_conn
1872            .execute(
1873                "UPDATE queue SET status='pending' WHERE status='processing'",
1874                [],
1875            )
1876            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1877        if reset > 0 {
1878            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1879        }
1880    }
1881
1882    if args.retry_failed {
1883        let count = queue_conn
1884            .execute(
1885                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1886                [],
1887            )
1888            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1889        tracing::info!(target: "enrich", count, "retrying failed items");
1890    }
1891
1892    if !args.resume && !args.retry_failed {
1893        queue_conn
1894            .execute("DELETE FROM queue", [])
1895            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1896    }
1897
1898    // Populate queue
1899    for (idx, key) in scan_result.iter().enumerate() {
1900        let item_type = match args.operation {
1901            EnrichOperation::EntityDescriptions => "entity",
1902            _ => "memory",
1903        };
1904        if let Err(e) = queue_conn.execute(
1905            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1906            rusqlite::params![key, item_type],
1907        ) {
1908            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1909        }
1910        let _ = idx; // suppress unused warning
1911    }
1912
1913    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1914    // Clamp enforces the range even if the caller bypasses clap validation.
1915    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1916    if parallelism > 1 {
1917        tracing::info!(
1918            target: "enrich",
1919            llm_parallelism = parallelism,
1920            "parallel LLM processing with bounded thread pool"
1921        );
1922    }
1923    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1924    // ceiling. The threshold and message depend on the LLM mode because
1925    // Claude Code spawns MCP children (G28-A) while Codex does not.
1926    if parallelism > 4 {
1927        match args.mode {
1928            EnrichMode::ClaudeCode => {
1929                tracing::warn!(
1930                    target: "enrich",
1931                    llm_parallelism = parallelism,
1932                    recommended_max = 4,
1933                    mode = "claude-code",
1934                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1935                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1936                     to cut MCP children (G28-A)"
1937                );
1938            }
1939            EnrichMode::Codex if parallelism > 16 => {
1940                tracing::warn!(
1941                    target: "enrich",
1942                    llm_parallelism = parallelism,
1943                    recommended_max = 16,
1944                    mode = "codex",
1945                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1946                     consider --llm-parallelism 8 for safer concurrency"
1947                );
1948            }
1949            EnrichMode::Codex => {
1950                // No warning: codex does not spawn MCP children and was
1951                // validated at parallelism 8 in production (1161 items,
1952                // 0 failures) per the 2026-06-04 session audit.
1953            }
1954            EnrichMode::Opencode if parallelism > 16 => {
1955                tracing::warn!(
1956                    target: "enrich",
1957                    llm_parallelism = parallelism,
1958                    recommended_max = 16,
1959                    mode = "opencode",
1960                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1961                     consider --llm-parallelism 8 for safer concurrency"
1962                );
1963            }
1964            EnrichMode::Opencode => {
1965                // No warning: opencode does not spawn MCP children.
1966            }
1967        }
1968    }
1969
1970    let mut completed = 0usize;
1971    let mut failed = 0usize;
1972    let mut skipped = 0usize;
1973    let mut cost_total = 0.0f64;
1974    let mut oauth_detected = false;
1975    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1976    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1977    let enrich_started = std::time::Instant::now();
1978
1979    let provider_timeout = match args.mode {
1980        EnrichMode::ClaudeCode => args.claude_timeout,
1981        EnrichMode::Codex => args.codex_timeout,
1982        EnrichMode::Opencode => args.opencode_timeout,
1983    };
1984
1985    let provider_model: Option<&str> = match args.mode {
1986        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1987        EnrichMode::Codex => args.codex_model.as_deref(),
1988        EnrichMode::Opencode => args.opencode_model.as_deref(),
1989    };
1990
1991    // G19: when parallelism > 1, spawn bounded worker threads.
1992    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1993    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1994    if parallelism > 1 {
1995        let stdout_mu = parking_lot::Mutex::new(());
1996        let budget = args.max_cost_usd;
1997        let operation = args.operation.clone();
1998        let mode = args.mode.clone();
1999        let min_oc = args.min_output_chars;
2000        let max_oc = args.max_output_chars;
2001        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2002
2003        struct WorkerResult {
2004            completed: usize,
2005            failed: usize,
2006            skipped: usize,
2007            cost: f64,
2008            oauth: bool,
2009        }
2010
2011        let results: Vec<WorkerResult> = std::thread::scope(|s| {
2012            let handles: Vec<_> = (0..parallelism)
2013                .map(|worker_id| {
2014                    let stdout_mu = &stdout_mu;
2015                    let paths = &paths;
2016                    let namespace = &namespace;
2017                    let provider_binary = provider_binary.as_deref();
2018                    let operation = &operation;
2019                    let mode = &mode;
2020                    let prompt_tpl = prompt_tpl.as_deref();
2021                    s.spawn(move || {
2022                        let w_conn = match open_rw(&paths.db) {
2023                            Ok(c) => c,
2024                            Err(e) => {
2025                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2026                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2027                            }
2028                        };
2029                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2030                            Ok(c) => c,
2031                            Err(e) => {
2032                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2033                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2034                            }
2035                        };
2036                        let mut w_completed = 0usize;
2037                        let mut w_failed = 0usize;
2038                        let mut w_skipped = 0usize;
2039                        let mut w_cost = 0.0f64;
2040                        let mut w_oauth = false;
2041                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2042                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2043                        // G28-D: per-worker circuit breaker that aborts the
2044                        // loop after `circuit_breaker_threshold` consecutive
2045                        // HardFailure outcomes (transient/rate-limited errors
2046                        // do NOT count, so a recovering provider is not
2047                        // penalised).
2048                        let mut w_breaker = crate::retry::CircuitBreaker::new(
2049                            args.circuit_breaker_threshold.max(1),
2050                            std::time::Duration::from_secs(60),
2051                        );
2052
2053                        loop {
2054                            if crate::shutdown_requested() {
2055                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2056                                break;
2057                            }
2058                            if let Some(b) = budget {
2059                                if !w_oauth && w_cost >= b {
2060                                    break;
2061                                }
2062                            }
2063                            let pending: Option<(i64, String, String)> = w_queue
2064                                .query_row(
2065                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2066                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2067                                     RETURNING id, item_key, item_type",
2068                                    [],
2069                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2070                                )
2071                                .ok();
2072                            let (queue_id, item_key, _item_type) = match pending {
2073                                Some(p) => p,
2074                                None => break,
2075                            };
2076                            let item_started = Instant::now();
2077                            let current_index = w_completed + w_failed + w_skipped;
2078
2079                            let call_result = match operation {
2080                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2081                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2082                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
2083                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
2084                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2085                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2086                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2087                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2088                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2089                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2090                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2091                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2092                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2093                            };
2094
2095                            match call_result {
2096                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2097                                    if is_oauth { w_oauth = true; }
2098                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2099                                    let _ = w_queue.execute(
2100                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2101                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2102                                    );
2103                                    w_completed += 1;
2104                                    if !is_oauth { w_cost += cost; }
2105                                    // G28-D: count success; resets breaker.
2106                                    let _ = w_breaker
2107                                        .record(crate::retry::AttemptOutcome::Success);
2108                                    let _guard = stdout_mu.lock();
2109                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2110                                }
2111                                Ok(EnrichItemResult::Skipped { reason }) => {
2112                                    w_skipped += 1;
2113                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2114                                    let _guard = stdout_mu.lock();
2115                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2116                                }
2117                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2118                                    // G29 Passo 4: worker mirror of the
2119                                    // serial path. Counted as a soft
2120                                    // skip so the queue surface shows
2121                                    // a quality issue rather than a
2122                                    // transport failure.
2123                                    w_skipped += 1;
2124                                    let reason = format!(
2125                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2126                                    );
2127                                    let _ = w_queue.execute(
2128                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2129                                        rusqlite::params![reason, queue_id],
2130                                    );
2131                                    let _guard = stdout_mu.lock();
2132                                    emit_json(&ItemEvent {
2133                                        item: &item_key,
2134                                        status: "preservation_failed",
2135                                        memory_id: None,
2136                                        entity_id: None,
2137                                        entities: None,
2138                                        rels: None,
2139                                        chars_before: Some(chars_before),
2140                                        chars_after: Some(chars_after),
2141                                        cost_usd: None,
2142                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2143                                        error: Some(reason),
2144                                        index: current_index,
2145                                        total,
2146                                    });
2147                                }
2148                                Err(e) => {
2149                                    let err_str = format!("{e}");
2150                                    if matches!(e, AppError::RateLimited { .. }) {
2151                                        if crate::retry::is_kill_switch_active() {
2152                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2153                                        } else if std::time::Instant::now() >= w_deadline {
2154                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2155                                        } else {
2156                                            let half = w_backoff / 2;
2157                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2158                                            let actual_wait = half + jitter;
2159                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2160                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2161                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2162                                            w_backoff = (w_backoff * 2).min(900);
2163                                            continue;
2164                                        }
2165                                    }
2166                                    w_failed += 1;
2167                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2168                                    let _guard = stdout_mu.lock();
2169                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2170                                    // G28-D: count hard failure against breaker.
2171                                    let breaker_opened = w_breaker
2172                                        .record(crate::retry::AttemptOutcome::HardFailure);
2173                                    if breaker_opened {
2174                                        tracing::error!(target: "enrich",
2175                                            consecutive_failures = w_breaker.consecutive_failures(),
2176                                            "circuit breaker opened — aborting worker"
2177                                        );
2178                                        break;
2179                                    }
2180                                }
2181                            }
2182                        }
2183                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2184                    })
2185                })
2186                .collect();
2187            handles
2188                .into_iter()
2189                .map(|h| {
2190                    h.join().unwrap_or(WorkerResult {
2191                        completed: 0,
2192                        failed: 0,
2193                        skipped: 0,
2194                        cost: 0.0,
2195                        oauth: false,
2196                    })
2197                })
2198                .collect()
2199        });
2200
2201        for r in &results {
2202            completed += r.completed;
2203            failed += r.failed;
2204            skipped += r.skipped;
2205            cost_total += r.cost;
2206            if r.oauth && !oauth_detected {
2207                oauth_detected = true;
2208            }
2209        }
2210    } else {
2211        // Serial path (parallelism == 1) — original loop
2212        loop {
2213            if crate::shutdown_requested() {
2214                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2215                break;
2216            }
2217
2218            // Budget check
2219            if let Some(budget) = args.max_cost_usd {
2220                if !oauth_detected && cost_total >= budget {
2221                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2222                    break;
2223                }
2224            }
2225
2226            // Dequeue next pending item
2227            let pending: Option<(i64, String, String)> = queue_conn
2228                .query_row(
2229                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2230                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2231                 RETURNING id, item_key, item_type",
2232                    [],
2233                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2234                )
2235                .ok();
2236
2237            let (queue_id, item_key, item_type) = match pending {
2238                Some(p) => p,
2239                None => break,
2240            };
2241
2242            let item_started = Instant::now();
2243            let current_index = completed + failed + skipped;
2244
2245            let call_result = match args.operation {
2246                EnrichOperation::MemoryBindings => call_memory_bindings(
2247                    &conn,
2248                    &namespace,
2249                    &item_key,
2250                    provider_binary
2251                        .as_deref()
2252                        .expect("provider binary required"),
2253                    provider_model,
2254                    provider_timeout,
2255                    &args.mode,
2256                ),
2257                EnrichOperation::EntityDescriptions => call_entity_description(
2258                    &conn,
2259                    &namespace,
2260                    &item_key,
2261                    provider_binary
2262                        .as_deref()
2263                        .expect("provider binary required"),
2264                    provider_model,
2265                    provider_timeout,
2266                    &args.mode,
2267                ),
2268                EnrichOperation::BodyEnrich => call_body_enrich(
2269                    &conn,
2270                    &namespace,
2271                    &item_key,
2272                    provider_binary
2273                        .as_deref()
2274                        .expect("provider binary required"),
2275                    provider_model,
2276                    provider_timeout,
2277                    &args.mode,
2278                    args.min_output_chars,
2279                    args.max_output_chars,
2280                    args.prompt_template.as_deref(),
2281                    args.preserve_threshold,
2282                    &paths,
2283                    llm_backend,
2284                ),
2285                EnrichOperation::ReEmbed => {
2286                    call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2287                }
2288                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2289                    &conn,
2290                    &namespace,
2291                    &item_key,
2292                    provider_binary
2293                        .as_deref()
2294                        .expect("provider binary required"),
2295                    provider_model,
2296                    provider_timeout,
2297                    &args.mode,
2298                ),
2299                EnrichOperation::RelationReclassify => call_relation_reclassify(
2300                    &conn,
2301                    &namespace,
2302                    &item_key,
2303                    provider_binary
2304                        .as_deref()
2305                        .expect("provider binary required"),
2306                    provider_model,
2307                    provider_timeout,
2308                    &args.mode,
2309                ),
2310                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2311                    call_entity_connect(
2312                        &conn,
2313                        &namespace,
2314                        &item_key,
2315                        provider_binary
2316                            .as_deref()
2317                            .expect("provider binary required"),
2318                        provider_model,
2319                        provider_timeout,
2320                        &args.mode,
2321                    )
2322                }
2323                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2324                    &conn,
2325                    &namespace,
2326                    &item_key,
2327                    provider_binary
2328                        .as_deref()
2329                        .expect("provider binary required"),
2330                    provider_model,
2331                    provider_timeout,
2332                    &args.mode,
2333                ),
2334                EnrichOperation::DescriptionEnrich => call_description_enrich(
2335                    &conn,
2336                    &namespace,
2337                    &item_key,
2338                    provider_binary
2339                        .as_deref()
2340                        .expect("provider binary required"),
2341                    provider_model,
2342                    provider_timeout,
2343                    &args.mode,
2344                ),
2345                EnrichOperation::DomainClassify => call_domain_classify(
2346                    &conn,
2347                    &namespace,
2348                    &item_key,
2349                    provider_binary
2350                        .as_deref()
2351                        .expect("provider binary required"),
2352                    provider_model,
2353                    provider_timeout,
2354                    &args.mode,
2355                ),
2356                EnrichOperation::GraphAudit => call_graph_audit(
2357                    &conn,
2358                    &namespace,
2359                    &item_key,
2360                    provider_binary
2361                        .as_deref()
2362                        .expect("provider binary required"),
2363                    provider_model,
2364                    provider_timeout,
2365                    &args.mode,
2366                ),
2367                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2368                    &conn,
2369                    &namespace,
2370                    &item_key,
2371                    provider_binary
2372                        .as_deref()
2373                        .expect("provider binary required"),
2374                    provider_model,
2375                    provider_timeout,
2376                    &args.mode,
2377                ),
2378                EnrichOperation::BodyExtract => call_body_extract(
2379                    &conn,
2380                    &namespace,
2381                    &item_key,
2382                    provider_binary
2383                        .as_deref()
2384                        .expect("provider binary required"),
2385                    provider_model,
2386                    provider_timeout,
2387                    &args.mode,
2388                ),
2389            };
2390
2391            match call_result {
2392                Ok(EnrichItemResult::Done {
2393                    memory_id,
2394                    entity_id,
2395                    entities,
2396                    rels,
2397                    chars_before,
2398                    chars_after,
2399                    cost,
2400                    is_oauth,
2401                }) => {
2402                    if is_oauth && !oauth_detected {
2403                        oauth_detected = true;
2404                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2405                    }
2406                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2407
2408                    // Persist depends on the operation
2409                    let persist_err: Option<String> = match args.operation {
2410                        EnrichOperation::MemoryBindings => {
2411                            // Bindings already persisted inside call_memory_bindings
2412                            None
2413                        }
2414                        EnrichOperation::EntityDescriptions => {
2415                            // Description already persisted inside call_entity_description
2416                            None
2417                        }
2418                        EnrichOperation::BodyEnrich => {
2419                            // Body already persisted inside call_body_enrich
2420                            None
2421                        }
2422                        _ => {
2423                            // All G27 operations persist inside their call_* function
2424                            None
2425                        }
2426                    };
2427
2428                    if let Err(e) = queue_conn.execute(
2429                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2430                    rusqlite::params![
2431                        memory_id,
2432                        entity_id,
2433                        entities as i64,
2434                        rels as i64,
2435                        cost,
2436                        item_started.elapsed().as_millis() as i64,
2437                        queue_id
2438                    ],
2439                ) {
2440                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2441                    }
2442
2443                    if persist_err.is_none() {
2444                        completed += 1;
2445                        if !is_oauth {
2446                            cost_total += cost;
2447                        }
2448                        emit_json(&ItemEvent {
2449                            item: &item_key,
2450                            status: "done",
2451                            memory_id,
2452                            entity_id,
2453                            entities: Some(entities),
2454                            rels: Some(rels),
2455                            chars_before,
2456                            chars_after,
2457                            cost_usd: if is_oauth { None } else { Some(cost) },
2458                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2459                            error: None,
2460                            index: current_index,
2461                            total,
2462                        });
2463                    } else {
2464                        failed += 1;
2465                        emit_json(&ItemEvent {
2466                            item: &item_key,
2467                            status: "failed",
2468                            memory_id: None,
2469                            entity_id: None,
2470                            entities: None,
2471                            rels: None,
2472                            chars_before: None,
2473                            chars_after: None,
2474                            cost_usd: None,
2475                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2476                            error: persist_err,
2477                            index: current_index,
2478                            total,
2479                        });
2480                    }
2481                }
2482                Ok(EnrichItemResult::Skipped { reason }) => {
2483                    skipped += 1;
2484                    if let Err(e) = queue_conn.execute(
2485                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2486                    rusqlite::params![reason, queue_id],
2487                ) {
2488                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2489                    }
2490                    emit_json(&ItemEvent {
2491                        item: &item_key,
2492                        status: "skipped",
2493                        memory_id: None,
2494                        entity_id: None,
2495                        entities: None,
2496                        rels: None,
2497                        chars_before: None,
2498                        chars_after: None,
2499                        cost_usd: None,
2500                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2501                        error: None,
2502                        index: current_index,
2503                        total,
2504                    });
2505                }
2506                Ok(EnrichItemResult::PreservationFailed {
2507                    score,
2508                    threshold,
2509                    chars_before,
2510                    chars_after,
2511                }) => {
2512                    // G29 Passo 4: the LLM rewrite diverged too far from
2513                    // the original body. Count as a soft failure (not
2514                    // `failed`) so the queue surfaces it as a quality
2515                    // issue, not a transport error. The reason is
2516                    // structured so the operator can audit why a body
2517                    // was rejected.
2518                    skipped += 1;
2519                    let reason = format!(
2520                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2521                    );
2522                    if let Err(qe) = queue_conn.execute(
2523                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2524                        rusqlite::params![reason, queue_id],
2525                    ) {
2526                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2527                    }
2528                    emit_json(&ItemEvent {
2529                        item: &item_key,
2530                        status: "preservation_failed",
2531                        memory_id: None,
2532                        entity_id: None,
2533                        entities: None,
2534                        rels: None,
2535                        chars_before: Some(chars_before),
2536                        chars_after: Some(chars_after),
2537                        cost_usd: None,
2538                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2539                        error: Some(reason),
2540                        index: current_index,
2541                        total,
2542                    });
2543                }
2544                Err(e) => {
2545                    let err_str = format!("{e}");
2546                    if matches!(e, AppError::RateLimited { .. }) {
2547                        if crate::retry::is_kill_switch_active() {
2548                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2549                        } else if std::time::Instant::now() >= rate_limit_deadline {
2550                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2551                        } else {
2552                            let half = backoff_secs / 2;
2553                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2554                            let actual_wait = half + jitter;
2555                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2556                            if let Err(qe) = queue_conn.execute(
2557                                "UPDATE queue SET status='pending' WHERE id=?1",
2558                                rusqlite::params![queue_id],
2559                            ) {
2560                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2561                            }
2562                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2563                            backoff_secs = (backoff_secs * 2).min(900);
2564                            continue;
2565                        }
2566                    }
2567
2568                    failed += 1;
2569                    if let Err(qe) = queue_conn.execute(
2570                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2571                    rusqlite::params![err_str, queue_id],
2572                ) {
2573                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2574                    }
2575                    emit_json(&ItemEvent {
2576                        item: &item_key,
2577                        status: "failed",
2578                        memory_id: None,
2579                        entity_id: None,
2580                        entities: None,
2581                        rels: None,
2582                        chars_before: None,
2583                        chars_after: None,
2584                        cost_usd: None,
2585                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2586                        error: Some(err_str),
2587                        index: current_index,
2588                        total,
2589                    });
2590                }
2591            }
2592
2593            let _ = item_type; // used via queue schema only
2594        }
2595    } // end else (serial path)
2596
2597    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2598    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2599
2600    emit_json(&EnrichSummary {
2601        summary: true,
2602        operation: format!("{:?}", args.operation),
2603        items_total: total,
2604        completed,
2605        failed,
2606        skipped,
2607        cost_usd: cost_total,
2608        elapsed_ms: started.elapsed().as_millis() as u64,
2609        backend_invoked: take_enrich_backend(),
2610    });
2611
2612    if failed == 0 {
2613        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2614    }
2615
2616    Ok(())
2617}
2618
2619// ---------------------------------------------------------------------------
2620// Internal result type for a single item call
2621// ---------------------------------------------------------------------------
2622
2623enum EnrichItemResult {
2624    Done {
2625        memory_id: Option<i64>,
2626        entity_id: Option<i64>,
2627        entities: usize,
2628        rels: usize,
2629        chars_before: Option<usize>,
2630        chars_after: Option<usize>,
2631        cost: f64,
2632        is_oauth: bool,
2633    },
2634    Skipped {
2635        reason: String,
2636    },
2637    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2638    /// body beyond the configured `--preserve-threshold` and was rejected
2639    /// before persistence. The trigram-Jaccard score and threshold are
2640    /// emitted in the NDJSON stream for operator audit.
2641    PreservationFailed {
2642        score: f64,
2643        threshold: f64,
2644        chars_before: usize,
2645        chars_after: usize,
2646    },
2647}
2648
2649// ---------------------------------------------------------------------------
2650// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2651// ---------------------------------------------------------------------------
2652
2653fn call_memory_bindings(
2654    conn: &Connection,
2655    namespace: &str,
2656    memory_name: &str,
2657    binary: &Path,
2658    model: Option<&str>,
2659    timeout: u64,
2660    mode: &EnrichMode,
2661) -> Result<EnrichItemResult, AppError> {
2662    // Look up the memory
2663    let (memory_id, body): (i64, String) = conn.query_row(
2664        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2665        rusqlite::params![namespace, memory_name],
2666        |r| Ok((r.get(0)?, r.get(1)?)),
2667    ).map_err(|e| match e {
2668        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2669        other => AppError::Database(other),
2670    })?;
2671
2672    if body.trim().is_empty() {
2673        return Ok(EnrichItemResult::Skipped {
2674            reason: "body is empty".to_string(),
2675        });
2676    }
2677
2678    let (value, cost, is_oauth) = match mode {
2679        EnrichMode::ClaudeCode => call_claude(
2680            binary,
2681            BINDINGS_PROMPT,
2682            BINDINGS_SCHEMA,
2683            &body,
2684            model,
2685            timeout,
2686        )?,
2687        EnrichMode::Codex => call_codex(
2688            binary,
2689            BINDINGS_PROMPT,
2690            BINDINGS_SCHEMA,
2691            &body,
2692            model,
2693            timeout,
2694        )?,
2695        EnrichMode::Opencode => call_opencode(
2696            binary,
2697            BINDINGS_PROMPT,
2698            BINDINGS_SCHEMA,
2699            &body,
2700            model,
2701            timeout,
2702        )?,
2703    };
2704
2705    let empty_arr = serde_json::Value::Array(vec![]);
2706    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2707    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2708
2709    let (ent_count, rel_count) =
2710        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2711
2712    Ok(EnrichItemResult::Done {
2713        memory_id: Some(memory_id),
2714        entity_id: None,
2715        entities: ent_count,
2716        rels: rel_count,
2717        chars_before: None,
2718        chars_after: None,
2719        cost,
2720        is_oauth,
2721    })
2722}
2723
2724fn call_entity_description(
2725    conn: &Connection,
2726    namespace: &str,
2727    entity_name: &str,
2728    binary: &Path,
2729    model: Option<&str>,
2730    timeout: u64,
2731    mode: &EnrichMode,
2732) -> Result<EnrichItemResult, AppError> {
2733    let (entity_id, entity_type): (i64, String) = conn
2734        .query_row(
2735            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2736            rusqlite::params![namespace, entity_name],
2737            |r| Ok((r.get(0)?, r.get(1)?)),
2738        )
2739        .map_err(|e| match e {
2740            rusqlite::Error::QueryReturnedNoRows => {
2741                AppError::NotFound(format!("entity '{entity_name}' not found"))
2742            }
2743            other => AppError::Database(other),
2744        })?;
2745
2746    let prompt = format!(
2747        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2748    );
2749
2750    let (value, cost, is_oauth) = match mode {
2751        EnrichMode::ClaudeCode => call_claude(
2752            binary,
2753            &prompt,
2754            ENTITY_DESCRIPTION_SCHEMA,
2755            "",
2756            model,
2757            timeout,
2758        )?,
2759        EnrichMode::Codex => call_codex(
2760            binary,
2761            &prompt,
2762            ENTITY_DESCRIPTION_SCHEMA,
2763            "",
2764            model,
2765            timeout,
2766        )?,
2767        EnrichMode::Opencode => call_opencode(
2768            binary,
2769            &prompt,
2770            ENTITY_DESCRIPTION_SCHEMA,
2771            "",
2772            model,
2773            timeout,
2774        )?,
2775    };
2776
2777    let description = value
2778        .get("description")
2779        .and_then(|v| v.as_str())
2780        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2781
2782    persist_entity_description(conn, entity_id, description)?;
2783
2784    Ok(EnrichItemResult::Done {
2785        memory_id: None,
2786        entity_id: Some(entity_id),
2787        entities: 0,
2788        rels: 0,
2789        chars_before: None,
2790        chars_after: None,
2791        cost,
2792        is_oauth,
2793    })
2794}
2795
2796#[allow(clippy::too_many_arguments)]
2797fn call_body_enrich(
2798    conn: &Connection,
2799    namespace: &str,
2800    memory_name: &str,
2801    binary: &Path,
2802    model: Option<&str>,
2803    timeout: u64,
2804    mode: &EnrichMode,
2805    min_output_chars: usize,
2806    max_output_chars: usize,
2807    prompt_template: Option<&Path>,
2808    preserve_threshold: f64,
2809    paths: &crate::paths::AppPaths,
2810    llm_backend: crate::cli::LlmBackendChoice,
2811) -> Result<EnrichItemResult, AppError> {
2812    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2813        .query_row(
2814            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2815         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2816            rusqlite::params![namespace, memory_name],
2817            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2818        )
2819        .map_err(|e| match e {
2820            rusqlite::Error::QueryReturnedNoRows => {
2821                AppError::NotFound(format!("memory '{memory_name}' not found"))
2822            }
2823            other => AppError::Database(other),
2824        })?;
2825
2826    let chars_before = body.chars().count();
2827
2828    // G26: gather graph context for contextualized enrichment
2829    let linked_entities: Vec<String> = {
2830        let mut stmt = conn.prepare_cached(
2831            "SELECT e.name FROM memory_entities me \
2832             JOIN entities e ON e.id = me.entity_id \
2833             WHERE me.memory_id = ?1 LIMIT 10",
2834        )?;
2835        let result: Vec<String> = stmt
2836            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2837            .filter_map(|r| r.ok())
2838            .collect();
2839        drop(stmt);
2840        result
2841    };
2842
2843    // Load custom prompt template if provided
2844    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2845        let file_size = std::fs::metadata(tmpl_path)
2846            .map_err(|e| {
2847                AppError::Io(std::io::Error::new(
2848                    e.kind(),
2849                    format!("failed to stat prompt template: {e}"),
2850                ))
2851            })?
2852            .len();
2853        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2854            return Err(AppError::LimitExceeded(
2855                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2856            ));
2857        }
2858        std::fs::read_to_string(tmpl_path).map_err(|e| {
2859            AppError::Io(std::io::Error::new(
2860                e.kind(),
2861                format!("failed to read prompt template: {e}"),
2862            ))
2863        })?
2864    } else {
2865        BODY_ENRICH_PROMPT_PREFIX.to_string()
2866    };
2867
2868    // G26: build contextualized prompt with graph data
2869    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2870        let mut ctx = String::new();
2871        ctx.push_str(&format!(
2872            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2873        ));
2874        if !description.is_empty() {
2875            ctx.push_str(&format!("- Description: {description}\n"));
2876        }
2877        ctx.push_str(&format!("- Domain: {namespace}\n"));
2878        if !linked_entities.is_empty() {
2879            ctx.push_str(&format!(
2880                "- Linked entities: {}\n",
2881                linked_entities.join(", ")
2882            ));
2883        }
2884        ctx
2885    } else {
2886        String::new()
2887    };
2888
2889    let prompt = format!(
2890        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2891    );
2892
2893    // The body schema uses a free-form enriched_body field
2894    let (value, cost, is_oauth) = match mode {
2895        EnrichMode::ClaudeCode => {
2896            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2897        }
2898        EnrichMode::Codex => {
2899            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2900        }
2901        EnrichMode::Opencode => {
2902            call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2903        }
2904    };
2905
2906    let enriched_body = value
2907        .get("enriched_body")
2908        .and_then(|v| v.as_str())
2909        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2910
2911    let chars_after = enriched_body.chars().count();
2912
2913    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2914    // a trigram-Jaccard similarity between the original body and the
2915    // LLM-rewritten body. When the score falls below
2916    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2917    // rewrite as a likely hallucination. The result is recorded in the
2918    // NDJSON stream so operators can audit what the LLM tried to do.
2919    let threshold = preserve_threshold;
2920    let verdict =
2921        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2922    if !verdict.is_accepted() {
2923        return Ok(EnrichItemResult::PreservationFailed {
2924            score: match verdict {
2925                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2926                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2927                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2928            },
2929            threshold,
2930            chars_before,
2931            chars_after,
2932        });
2933    }
2934
2935    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2936    // compare the hash of the original body against the hash of the enriched
2937    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2938    // body (rare but possible) — treat as `Skipped` so re-running the batch
2939    // is safe and the queue does not get re-persisted entries.
2940    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2941    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2942    if old_hash == new_hash {
2943        return Ok(EnrichItemResult::Skipped {
2944            reason: format!(
2945                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2946            ),
2947        });
2948    }
2949
2950    // Only persist if the enriched body is genuinely longer
2951    if chars_after <= chars_before {
2952        return Ok(EnrichItemResult::Skipped {
2953            reason: format!(
2954                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2955            ),
2956        });
2957    }
2958
2959    persist_enriched_body(
2960        conn,
2961        namespace,
2962        memory_id,
2963        memory_name,
2964        enriched_body,
2965        paths,
2966        llm_backend,
2967    )?;
2968
2969    Ok(EnrichItemResult::Done {
2970        memory_id: Some(memory_id),
2971        entity_id: None,
2972        entities: 0,
2973        rels: 0,
2974        chars_before: Some(chars_before),
2975        chars_after: Some(chars_after),
2976        cost,
2977        is_oauth,
2978    })
2979}
2980
2981fn call_reembed(
2982    conn: &Connection,
2983    namespace: &str,
2984    memory_name: &str,
2985    paths: &crate::paths::AppPaths,
2986    llm_backend: crate::cli::LlmBackendChoice,
2987) -> Result<EnrichItemResult, AppError> {
2988    let (memory_id, body, memory_type): (i64, String, String) = conn
2989        .query_row(
2990            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2991             FROM memories
2992             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2993            rusqlite::params![namespace, memory_name],
2994            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2995        )
2996        .map_err(|e| match e {
2997            rusqlite::Error::QueryReturnedNoRows => {
2998                AppError::NotFound(format!("memory '{memory_name}' not found"))
2999            }
3000            other => AppError::Database(other),
3001        })?;
3002
3003    if body.trim().is_empty() {
3004        return Ok(EnrichItemResult::Skipped {
3005            reason: "body is empty".to_string(),
3006        });
3007    }
3008
3009    reembed_memory_vector(
3010        conn,
3011        namespace,
3012        memory_id,
3013        memory_name,
3014        &memory_type,
3015        &body,
3016        paths,
3017        llm_backend,
3018    )?;
3019
3020    Ok(EnrichItemResult::Done {
3021        memory_id: Some(memory_id),
3022        entity_id: None,
3023        entities: 0,
3024        rels: 0,
3025        chars_before: Some(body.chars().count()),
3026        chars_after: Some(body.chars().count()),
3027        cost: 0.0,
3028        is_oauth: true,
3029    })
3030}
3031
3032// ---------------------------------------------------------------------------
3033// Scan dispatcher — maps operation to scan query result (item keys)
3034// ---------------------------------------------------------------------------
3035
3036fn scan_operation(
3037    conn: &Connection,
3038    namespace: &str,
3039    args: &EnrichArgs,
3040) -> Result<Vec<String>, AppError> {
3041    // G37: resolve --names + --names-file once and apply to every scan path.
3042    let name_filter = resolve_name_filter(args)?;
3043    match args.operation {
3044        EnrichOperation::MemoryBindings => {
3045            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3046            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3047        }
3048        EnrichOperation::EntityDescriptions => {
3049            let rows =
3050                scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3051            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3052        }
3053        EnrichOperation::BodyEnrich => {
3054            let rows = scan_short_body_memories(
3055                conn,
3056                namespace,
3057                args.min_output_chars,
3058                args.limit,
3059                &name_filter,
3060            )?;
3061            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3062        }
3063        EnrichOperation::ReEmbed => {
3064            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3065            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3066        }
3067        EnrichOperation::WeightCalibrate => {
3068            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3069            Ok(rows
3070                .into_iter()
3071                .map(|(id, _, _, _, _)| id.to_string())
3072                .collect())
3073        }
3074        EnrichOperation::RelationReclassify => {
3075            let rows = scan_generic_relations(conn, namespace, args.limit)?;
3076            Ok(rows
3077                .into_iter()
3078                .map(|(id, _, _, _)| id.to_string())
3079                .collect())
3080        }
3081        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3082            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3083            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3084        }
3085        EnrichOperation::EntityTypeValidate => {
3086            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3087            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3088        }
3089        EnrichOperation::DescriptionEnrich => {
3090            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3091            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3092        }
3093        EnrichOperation::DomainClassify
3094        | EnrichOperation::GraphAudit
3095        | EnrichOperation::DeepResearchSynth
3096        | EnrichOperation::BodyExtract => {
3097            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3098            let sql = format!(
3099                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3100            );
3101            let mut stmt = conn.prepare(&sql)?;
3102            let names = stmt
3103                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3104                .collect::<Result<Vec<_>, _>>()?;
3105            Ok(names)
3106        }
3107    }
3108}
3109
3110// ---------------------------------------------------------------------------
3111// Codex stub provider
3112// ---------------------------------------------------------------------------
3113
3114/// Locates the Codex CLI binary.
3115fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3116    if let Some(p) = explicit {
3117        if p.exists() {
3118            return Ok(p.to_path_buf());
3119        }
3120        return Err(AppError::Validation(format!(
3121            "Codex binary not found at explicit path: {}",
3122            p.display()
3123        )));
3124    }
3125
3126    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3127        let p = PathBuf::from(&env_path);
3128        if p.exists() {
3129            return Ok(p);
3130        }
3131    }
3132
3133    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3134    if let Some(path_var) = std::env::var_os("PATH") {
3135        for dir in std::env::split_paths(&path_var) {
3136            let candidate = dir.join(name);
3137            if candidate.exists() {
3138                return Ok(crate::extract::llm_embedding::resolve_real_binary(
3139                    &candidate,
3140                ));
3141            }
3142        }
3143    }
3144
3145    Err(AppError::Validation(
3146        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3147    ))
3148}
3149
3150/// G27: Calibrate weight of a single relationship via LLM.
3151fn call_weight_calibrate(
3152    conn: &Connection,
3153    _namespace: &str,
3154    item_key: &str,
3155    binary: &Path,
3156    model: Option<&str>,
3157    timeout: u64,
3158    mode: &EnrichMode,
3159) -> Result<EnrichItemResult, AppError> {
3160    let rel_id: i64 = item_key
3161        .parse()
3162        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3163    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3164        .query_row(
3165            "SELECT e1.name, e2.name, r.relation, r.weight \
3166             FROM relationships r \
3167             JOIN entities e1 ON e1.id = r.source_id \
3168             JOIN entities e2 ON e2.id = r.target_id \
3169             WHERE r.id = ?1",
3170            rusqlite::params![rel_id],
3171            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3172        )
3173        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3174
3175    let input_text = format!(
3176        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3177    );
3178    let (value, cost, is_oauth) = match mode {
3179        EnrichMode::ClaudeCode => call_claude(
3180            binary,
3181            WEIGHT_CALIBRATE_PROMPT,
3182            WEIGHT_CALIBRATE_SCHEMA,
3183            &input_text,
3184            model,
3185            timeout,
3186        )?,
3187        EnrichMode::Codex => call_codex(
3188            binary,
3189            WEIGHT_CALIBRATE_PROMPT,
3190            WEIGHT_CALIBRATE_SCHEMA,
3191            &input_text,
3192            model,
3193            timeout,
3194        )?,
3195        EnrichMode::Opencode => call_opencode(
3196            binary,
3197            WEIGHT_CALIBRATE_PROMPT,
3198            WEIGHT_CALIBRATE_SCHEMA,
3199            &input_text,
3200            model,
3201            timeout,
3202        )?,
3203    };
3204
3205    let calibrated = value
3206        .get("calibrated_weight")
3207        .and_then(|v| v.as_f64())
3208        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3209
3210    conn.execute(
3211        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3212        rusqlite::params![calibrated, rel_id],
3213    )?;
3214
3215    Ok(EnrichItemResult::Done {
3216        memory_id: None,
3217        entity_id: None,
3218        entities: 0,
3219        rels: 1,
3220        chars_before: None,
3221        chars_after: None,
3222        cost,
3223        is_oauth,
3224    })
3225}
3226
3227/// G27: Reclassify a generic relationship type via LLM.
3228fn call_relation_reclassify(
3229    conn: &Connection,
3230    _namespace: &str,
3231    item_key: &str,
3232    binary: &Path,
3233    model: Option<&str>,
3234    timeout: u64,
3235    mode: &EnrichMode,
3236) -> Result<EnrichItemResult, AppError> {
3237    let rel_id: i64 = item_key
3238        .parse()
3239        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3240    let (source_name, target_name, current_relation): (String, String, String) = conn
3241        .query_row(
3242            "SELECT e1.name, e2.name, r.relation \
3243             FROM relationships r \
3244             JOIN entities e1 ON e1.id = r.source_id \
3245             JOIN entities e2 ON e2.id = r.target_id \
3246             WHERE r.id = ?1",
3247            rusqlite::params![rel_id],
3248            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3249        )
3250        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3251
3252    let input_text = format!(
3253        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3254    );
3255    let (value, cost, is_oauth) = match mode {
3256        EnrichMode::ClaudeCode => call_claude(
3257            binary,
3258            RELATION_RECLASSIFY_PROMPT,
3259            RELATION_RECLASSIFY_SCHEMA,
3260            &input_text,
3261            model,
3262            timeout,
3263        )?,
3264        EnrichMode::Codex => call_codex(
3265            binary,
3266            RELATION_RECLASSIFY_PROMPT,
3267            RELATION_RECLASSIFY_SCHEMA,
3268            &input_text,
3269            model,
3270            timeout,
3271        )?,
3272        EnrichMode::Opencode => call_opencode(
3273            binary,
3274            RELATION_RECLASSIFY_PROMPT,
3275            RELATION_RECLASSIFY_SCHEMA,
3276            &input_text,
3277            model,
3278            timeout,
3279        )?,
3280    };
3281
3282    let new_relation = value
3283        .get("relation")
3284        .and_then(|v| v.as_str())
3285        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3286    let new_strength = value
3287        .get("strength")
3288        .and_then(|v| v.as_f64())
3289        .unwrap_or(0.5);
3290
3291    conn.execute(
3292        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3293        rusqlite::params![new_relation, new_strength, rel_id],
3294    )?;
3295
3296    Ok(EnrichItemResult::Done {
3297        memory_id: None,
3298        entity_id: None,
3299        entities: 0,
3300        rels: 1,
3301        chars_before: None,
3302        chars_after: None,
3303        cost,
3304        is_oauth,
3305    })
3306}
3307
3308/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3309fn call_entity_connect(
3310    conn: &Connection,
3311    namespace: &str,
3312    item_key: &str,
3313    binary: &Path,
3314    model: Option<&str>,
3315    timeout: u64,
3316    mode: &EnrichMode,
3317) -> Result<EnrichItemResult, AppError> {
3318    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3319    let (e1_id, e1_name, e2_id, e2_name) =
3320        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3321            Some(p) => p,
3322            None => {
3323                return Ok(EnrichItemResult::Skipped {
3324                    reason: "pair no longer isolated".into(),
3325                })
3326            }
3327        };
3328    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3329    let (value, cost, is_oauth) = match mode {
3330        EnrichMode::ClaudeCode => call_claude(
3331            binary,
3332            ENTITY_CONNECT_PROMPT,
3333            ENTITY_CONNECT_SCHEMA,
3334            &input_text,
3335            model,
3336            timeout,
3337        )?,
3338        EnrichMode::Codex => call_codex(
3339            binary,
3340            ENTITY_CONNECT_PROMPT,
3341            ENTITY_CONNECT_SCHEMA,
3342            &input_text,
3343            model,
3344            timeout,
3345        )?,
3346        EnrichMode::Opencode => call_opencode(
3347            binary,
3348            ENTITY_CONNECT_PROMPT,
3349            ENTITY_CONNECT_SCHEMA,
3350            &input_text,
3351            model,
3352            timeout,
3353        )?,
3354    };
3355    let relation = value
3356        .get("relation")
3357        .and_then(|v| v.as_str())
3358        .unwrap_or("none");
3359    if relation == "none" {
3360        return Ok(EnrichItemResult::Skipped {
3361            reason: "LLM determined no relationship".into(),
3362        });
3363    }
3364    let strength = value
3365        .get("strength")
3366        .and_then(|v| v.as_f64())
3367        .unwrap_or(0.5);
3368    conn.execute(
3369        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3370        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3371    )?;
3372    Ok(EnrichItemResult::Done {
3373        memory_id: None,
3374        entity_id: None,
3375        entities: 0,
3376        rels: 1,
3377        chars_before: None,
3378        chars_after: None,
3379        cost,
3380        is_oauth,
3381    })
3382}
3383
3384/// G27 P2: Validate entity type assignment via LLM.
3385fn call_entity_type_validate(
3386    conn: &Connection,
3387    _namespace: &str,
3388    item_key: &str,
3389    binary: &Path,
3390    model: Option<&str>,
3391    timeout: u64,
3392    mode: &EnrichMode,
3393) -> Result<EnrichItemResult, AppError> {
3394    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3395        .query_row(
3396            "SELECT id, name, type FROM entities WHERE name = ?1",
3397            rusqlite::params![item_key],
3398            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3399        )
3400        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3401    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3402    let (value, cost, is_oauth) = match mode {
3403        EnrichMode::ClaudeCode => call_claude(
3404            binary,
3405            ENTITY_TYPE_VALIDATE_PROMPT,
3406            ENTITY_TYPE_VALIDATE_SCHEMA,
3407            &input_text,
3408            model,
3409            timeout,
3410        )?,
3411        EnrichMode::Codex => call_codex(
3412            binary,
3413            ENTITY_TYPE_VALIDATE_PROMPT,
3414            ENTITY_TYPE_VALIDATE_SCHEMA,
3415            &input_text,
3416            model,
3417            timeout,
3418        )?,
3419        EnrichMode::Opencode => call_opencode(
3420            binary,
3421            ENTITY_TYPE_VALIDATE_PROMPT,
3422            ENTITY_TYPE_VALIDATE_SCHEMA,
3423            &input_text,
3424            model,
3425            timeout,
3426        )?,
3427    };
3428    let validated_type = value
3429        .get("validated_type")
3430        .and_then(|v| v.as_str())
3431        .unwrap_or(&ent_type);
3432    let was_correct = value
3433        .get("was_correct")
3434        .and_then(|v| v.as_bool())
3435        .unwrap_or(true);
3436    if !was_correct {
3437        conn.execute(
3438            "UPDATE entities SET type = ?1 WHERE id = ?2",
3439            rusqlite::params![validated_type, ent_id],
3440        )?;
3441    }
3442    Ok(EnrichItemResult::Done {
3443        memory_id: None,
3444        entity_id: Some(ent_id),
3445        entities: 1,
3446        rels: 0,
3447        chars_before: None,
3448        chars_after: None,
3449        cost,
3450        is_oauth,
3451    })
3452}
3453
3454/// G27 P2: Enrich generic memory description via LLM.
3455fn call_description_enrich(
3456    conn: &Connection,
3457    _namespace: &str,
3458    item_key: &str,
3459    binary: &Path,
3460    model: Option<&str>,
3461    timeout: u64,
3462    mode: &EnrichMode,
3463) -> Result<EnrichItemResult, AppError> {
3464    let (mem_id, body, old_desc): (i64, String, String) = conn
3465        .query_row(
3466            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3467            rusqlite::params![item_key],
3468            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3469        )
3470        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3471    let snippet: String = body.chars().take(500).collect();
3472    let input_text = format!(
3473        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3474    );
3475    let (value, cost, is_oauth) = match mode {
3476        EnrichMode::ClaudeCode => call_claude(
3477            binary,
3478            DESCRIPTION_ENRICH_PROMPT,
3479            DESCRIPTION_ENRICH_SCHEMA,
3480            &input_text,
3481            model,
3482            timeout,
3483        )?,
3484        EnrichMode::Codex => call_codex(
3485            binary,
3486            DESCRIPTION_ENRICH_PROMPT,
3487            DESCRIPTION_ENRICH_SCHEMA,
3488            &input_text,
3489            model,
3490            timeout,
3491        )?,
3492        EnrichMode::Opencode => call_opencode(
3493            binary,
3494            DESCRIPTION_ENRICH_PROMPT,
3495            DESCRIPTION_ENRICH_SCHEMA,
3496            &input_text,
3497            model,
3498            timeout,
3499        )?,
3500    };
3501    let new_desc = value
3502        .get("description")
3503        .and_then(|v| v.as_str())
3504        .unwrap_or(&old_desc);
3505    let old_name: String = conn.query_row(
3506        "SELECT name FROM memories WHERE id = ?1",
3507        rusqlite::params![mem_id],
3508        |r| r.get(0),
3509    )?;
3510    conn.execute(
3511        "UPDATE memories SET description = ?1 WHERE id = ?2",
3512        rusqlite::params![new_desc, mem_id],
3513    )?;
3514    memories::sync_fts_after_update(
3515        conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3516    )?;
3517    Ok(EnrichItemResult::Done {
3518        memory_id: Some(mem_id),
3519        entity_id: None,
3520        entities: 0,
3521        rels: 0,
3522        chars_before: Some(old_desc.len()),
3523        chars_after: Some(new_desc.len()),
3524        cost,
3525        is_oauth,
3526    })
3527}
3528
3529/// G27 P2: Classify memory into domain category via LLM.
3530fn call_domain_classify(
3531    conn: &Connection,
3532    _namespace: &str,
3533    item_key: &str,
3534    binary: &Path,
3535    model: Option<&str>,
3536    timeout: u64,
3537    mode: &EnrichMode,
3538) -> Result<EnrichItemResult, AppError> {
3539    let (mem_id, body, desc): (i64, String, String) = conn
3540        .query_row(
3541            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3542            rusqlite::params![item_key],
3543            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3544        )
3545        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3546    let snippet: String = body.chars().take(500).collect();
3547    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3548    let (value, cost, is_oauth) = match mode {
3549        EnrichMode::ClaudeCode => call_claude(
3550            binary,
3551            DOMAIN_CLASSIFY_PROMPT,
3552            DOMAIN_CLASSIFY_SCHEMA,
3553            &input_text,
3554            model,
3555            timeout,
3556        )?,
3557        EnrichMode::Codex => call_codex(
3558            binary,
3559            DOMAIN_CLASSIFY_PROMPT,
3560            DOMAIN_CLASSIFY_SCHEMA,
3561            &input_text,
3562            model,
3563            timeout,
3564        )?,
3565        EnrichMode::Opencode => call_opencode(
3566            binary,
3567            DOMAIN_CLASSIFY_PROMPT,
3568            DOMAIN_CLASSIFY_SCHEMA,
3569            &input_text,
3570            model,
3571            timeout,
3572        )?,
3573    };
3574    let domain = value
3575        .get("domain")
3576        .and_then(|v| v.as_str())
3577        .unwrap_or("uncategorized");
3578    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3579    conn.execute(
3580        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3581        rusqlite::params![metadata, mem_id],
3582    )?;
3583    Ok(EnrichItemResult::Done {
3584        memory_id: Some(mem_id),
3585        entity_id: None,
3586        entities: 0,
3587        rels: 0,
3588        chars_before: None,
3589        chars_after: None,
3590        cost,
3591        is_oauth,
3592    })
3593}
3594
3595/// G27 P2: Audit memory graph quality via LLM.
3596fn call_graph_audit(
3597    conn: &Connection,
3598    _namespace: &str,
3599    item_key: &str,
3600    binary: &Path,
3601    model: Option<&str>,
3602    timeout: u64,
3603    mode: &EnrichMode,
3604) -> Result<EnrichItemResult, AppError> {
3605    let (mem_id, body, desc): (i64, String, String) = conn
3606        .query_row(
3607            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3608            rusqlite::params![item_key],
3609            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3610        )
3611        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3612    let snippet: String = body.chars().take(500).collect();
3613    let ent_count: i64 = conn
3614        .query_row(
3615            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3616            rusqlite::params![mem_id],
3617            |r| r.get(0),
3618        )
3619        .unwrap_or(0);
3620    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3621    let (value, cost, is_oauth) = match mode {
3622        EnrichMode::ClaudeCode => call_claude(
3623            binary,
3624            GRAPH_AUDIT_PROMPT,
3625            GRAPH_AUDIT_SCHEMA,
3626            &input_text,
3627            model,
3628            timeout,
3629        )?,
3630        EnrichMode::Codex => call_codex(
3631            binary,
3632            GRAPH_AUDIT_PROMPT,
3633            GRAPH_AUDIT_SCHEMA,
3634            &input_text,
3635            model,
3636            timeout,
3637        )?,
3638        EnrichMode::Opencode => call_opencode(
3639            binary,
3640            GRAPH_AUDIT_PROMPT,
3641            GRAPH_AUDIT_SCHEMA,
3642            &input_text,
3643            model,
3644            timeout,
3645        )?,
3646    };
3647    let issues = value
3648        .get("issues")
3649        .and_then(|v| v.as_array())
3650        .map(|a| a.len())
3651        .unwrap_or(0);
3652    Ok(EnrichItemResult::Done {
3653        memory_id: Some(mem_id),
3654        entity_id: None,
3655        entities: 0,
3656        rels: issues,
3657        chars_before: None,
3658        chars_after: None,
3659        cost,
3660        is_oauth,
3661    })
3662}
3663
3664/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3665fn call_deep_research_synth(
3666    conn: &Connection,
3667    namespace: &str,
3668    item_key: &str,
3669    binary: &Path,
3670    model: Option<&str>,
3671    timeout: u64,
3672    mode: &EnrichMode,
3673) -> Result<EnrichItemResult, AppError> {
3674    let (mem_id, body): (i64, String) = conn
3675        .query_row(
3676            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3677            rusqlite::params![item_key],
3678            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3679        )
3680        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3681    let snippet: String = body.chars().take(2000).collect();
3682    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3683    let (value, cost, is_oauth) = match mode {
3684        EnrichMode::ClaudeCode => call_claude(
3685            binary,
3686            DEEP_RESEARCH_SYNTH_PROMPT,
3687            DEEP_RESEARCH_SYNTH_SCHEMA,
3688            &input_text,
3689            model,
3690            timeout,
3691        )?,
3692        EnrichMode::Codex => call_codex(
3693            binary,
3694            DEEP_RESEARCH_SYNTH_PROMPT,
3695            DEEP_RESEARCH_SYNTH_SCHEMA,
3696            &input_text,
3697            model,
3698            timeout,
3699        )?,
3700        EnrichMode::Opencode => call_opencode(
3701            binary,
3702            DEEP_RESEARCH_SYNTH_PROMPT,
3703            DEEP_RESEARCH_SYNTH_SCHEMA,
3704            &input_text,
3705            model,
3706            timeout,
3707        )?,
3708    };
3709    let mut ent_count = 0usize;
3710    let mut rel_count = 0usize;
3711    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3712        for e in ents {
3713            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3714            let etype_str = e
3715                .get("entity_type")
3716                .and_then(|v| v.as_str())
3717                .unwrap_or("concept");
3718            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3719            if name.len() >= 2 {
3720                let ne = NewEntity {
3721                    name: name.to_string(),
3722                    entity_type: etype,
3723                    description: None,
3724                };
3725                let _ = entities::upsert_entity(conn, namespace, &ne);
3726                ent_count += 1;
3727            }
3728        }
3729    }
3730    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3731        for r in rels {
3732            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3733            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3734            if src.is_empty() || tgt.is_empty() {
3735                continue;
3736            }
3737            let rel = r
3738                .get("relation")
3739                .and_then(|v| v.as_str())
3740                .unwrap_or("related");
3741            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3742            if let (Some(sid), Some(tid)) = (
3743                entities::find_entity_id(conn, namespace, src)?,
3744                entities::find_entity_id(conn, namespace, tgt)?,
3745            ) {
3746                let _ = entities::create_or_fetch_relationship(
3747                    conn, namespace, sid, tid, rel, str_, None,
3748                );
3749                rel_count += 1;
3750            }
3751        }
3752    }
3753    Ok(EnrichItemResult::Done {
3754        memory_id: Some(mem_id),
3755        entity_id: None,
3756        entities: ent_count,
3757        rels: rel_count,
3758        chars_before: None,
3759        chars_after: None,
3760        cost,
3761        is_oauth,
3762    })
3763}
3764
3765/// G27 P2: Extract structured body from unstructured text via LLM.
3766fn call_body_extract(
3767    conn: &Connection,
3768    _namespace: &str,
3769    item_key: &str,
3770    binary: &Path,
3771    model: Option<&str>,
3772    timeout: u64,
3773    mode: &EnrichMode,
3774) -> Result<EnrichItemResult, AppError> {
3775    let (mem_id, body, old_desc): (i64, String, String) = conn
3776        .query_row(
3777            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3778            rusqlite::params![item_key],
3779            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3780        )
3781        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3782    let old_name: String = conn.query_row(
3783        "SELECT name FROM memories WHERE id = ?1",
3784        rusqlite::params![mem_id],
3785        |r| r.get(0),
3786    )?;
3787    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3788    let (value, cost, is_oauth) = match mode {
3789        EnrichMode::ClaudeCode => call_claude(
3790            binary,
3791            BODY_EXTRACT_PROMPT,
3792            BODY_EXTRACT_SCHEMA,
3793            &input_text,
3794            model,
3795            timeout,
3796        )?,
3797        EnrichMode::Codex => call_codex(
3798            binary,
3799            BODY_EXTRACT_PROMPT,
3800            BODY_EXTRACT_SCHEMA,
3801            &input_text,
3802            model,
3803            timeout,
3804        )?,
3805        EnrichMode::Opencode => call_opencode(
3806            binary,
3807            BODY_EXTRACT_PROMPT,
3808            BODY_EXTRACT_SCHEMA,
3809            &input_text,
3810            model,
3811            timeout,
3812        )?,
3813    };
3814    let restructured = value
3815        .get("restructured_body")
3816        .and_then(|v| v.as_str())
3817        .unwrap_or(&body);
3818    let chars_before = body.len();
3819    let chars_after = restructured.len();
3820    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3821    conn.execute(
3822        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3823        rusqlite::params![restructured, new_hash, mem_id],
3824    )?;
3825    memories::sync_fts_after_update(
3826        conn,
3827        mem_id,
3828        &old_name,
3829        &old_desc,
3830        &body,
3831        &old_name,
3832        &old_desc,
3833        restructured,
3834    )?;
3835    Ok(EnrichItemResult::Done {
3836        memory_id: Some(mem_id),
3837        entity_id: None,
3838        entities: 0,
3839        rels: 0,
3840        chars_before: Some(chars_before),
3841        chars_after: Some(chars_after),
3842        cost,
3843        is_oauth,
3844    })
3845}
3846
3847/// Scan for pairs of entities that share no direct relationship.
3848#[allow(clippy::type_complexity)]
3849fn scan_isolated_entity_pairs(
3850    conn: &Connection,
3851    namespace: &str,
3852    limit: Option<usize>,
3853) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3854    let limit_val = limit.unwrap_or(50) as i64;
3855    let mut stmt = conn.prepare_cached(
3856        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3857         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3858         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3859           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3860           (r.source_id = e2.id AND r.target_id = e1.id)) \
3861         LIMIT ?2",
3862    )?;
3863    let rows = stmt
3864        .query_map(rusqlite::params![namespace, limit_val], |r| {
3865            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3866        })?
3867        .collect::<Result<Vec<_>, _>>()?;
3868    Ok(rows)
3869}
3870
3871/// Scan for entities with non-validated types (all entities for type audit).
3872fn scan_entities_for_type_validation(
3873    conn: &Connection,
3874    namespace: &str,
3875    limit: Option<usize>,
3876) -> Result<Vec<(i64, String, String)>, AppError> {
3877    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3878    let sql = format!(
3879        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3880    );
3881    let mut stmt = conn.prepare(&sql)?;
3882    let rows = stmt
3883        .query_map(rusqlite::params![namespace], |r| {
3884            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3885        })?
3886        .collect::<Result<Vec<_>, _>>()?;
3887    Ok(rows)
3888}
3889
3890/// Scan for memories with generic descriptions (ingested, imported, etc).
3891fn scan_generic_descriptions(
3892    conn: &Connection,
3893    namespace: &str,
3894    limit: Option<usize>,
3895) -> Result<Vec<(i64, String, String)>, AppError> {
3896    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3897    let sql = format!(
3898        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3899         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3900         ORDER BY id {limit_clause}"
3901    );
3902    let mut stmt = conn.prepare(&sql)?;
3903    let rows = stmt
3904        .query_map(rusqlite::params![namespace], |r| {
3905            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3906        })?
3907        .collect::<Result<Vec<_>, _>>()?;
3908    Ok(rows)
3909}
3910
3911/// Calls the Codex CLI for a single enrichment item.
3912///
3913/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3914fn call_codex(
3915    binary: &Path,
3916    prompt: &str,
3917    json_schema: &str,
3918    input_text: &str,
3919    model: Option<&str>,
3920    timeout_secs: u64,
3921) -> Result<(serde_json::Value, f64, bool), AppError> {
3922    use wait_timeout::ChildExt;
3923
3924    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3925    // schema to a trusted cache path (not /tmp), and reuse the
3926    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3927    // hardening rationale.
3928    super::codex_spawn::validate_codex_model(model)?;
3929    let schema_file = super::codex_spawn::trusted_schema_path()?;
3930
3931    let args = super::codex_spawn::CodexSpawnArgs {
3932        binary,
3933        prompt,
3934        json_schema,
3935        input_text,
3936        model,
3937        timeout_secs,
3938        schema_path: schema_file.clone(),
3939    };
3940    let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3941
3942    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3943        AppError::Io(std::io::Error::new(
3944            e.kind(),
3945            format!("failed to spawn codex: {e}"),
3946        ))
3947    })?;
3948
3949    let full_prompt = format!("{prompt}\n\n{input_text}");
3950    let stdin_bytes = full_prompt.into_bytes();
3951    let mut child_stdin = child
3952        .stdin
3953        .take()
3954        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3955    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3956        child_stdin.write_all(&stdin_bytes)?;
3957        drop(child_stdin);
3958        Ok(())
3959    });
3960
3961    let start = std::time::Instant::now();
3962    let timeout = std::time::Duration::from_secs(timeout_secs);
3963    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3964    let _ = std::fs::remove_file(&schema_file);
3965
3966    match status {
3967        Some(exit_status) => {
3968            stdin_thread
3969                .join()
3970                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3971                .map_err(AppError::Io)?;
3972
3973            tracing::debug!(
3974                target: "process",
3975                exit_code = ?exit_status.code(),
3976                elapsed_ms = start.elapsed().as_millis() as u64,
3977                "external process completed"
3978            );
3979
3980            let mut stdout_buf = Vec::new();
3981            if let Some(mut out) = child.stdout.take() {
3982                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3983            }
3984            if !exit_status.success() {
3985                let mut stderr_buf = Vec::new();
3986                if let Some(mut err) = child.stderr.take() {
3987                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3988                }
3989                let stderr_str = String::from_utf8_lossy(&stderr_buf);
3990                tracing::warn!(
3991                    target: "enrich",
3992                    exit_code = ?exit_status.code(),
3993                    stderr = %stderr_str.trim(),
3994                    "codex process failed"
3995                );
3996                return Err(AppError::Validation(format!(
3997                    "codex exited with code {:?}: {}",
3998                    exit_status.code(),
3999                    stderr_str.trim()
4000                )));
4001            }
4002            let stdout_str = String::from_utf8(stdout_buf)
4003                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4004            // G32: use the JSONL parser, NOT serde_json::from_str on the
4005            // entire stdout (codex emits one event per line).
4006            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4007            // Return the raw agent_message text parsed as JSON. Different
4008            // operations (memory-bindings, body-enrich) use different
4009            // output schemas, so we let the caller pick which fields to
4010            // extract. The previous implementation hardcoded
4011            // `{entities, urls}` which broke body-enrich.
4012            let value: serde_json::Value =
4013                serde_json::from_str(&result.last_agent_text).map_err(|e| {
4014                    AppError::Validation(format!(
4015                        "codex agent_message is not valid JSON: {e}; raw={}",
4016                        result.last_agent_text
4017                    ))
4018                })?;
4019            Ok((value, 0.0, false))
4020        }
4021        None => {
4022            let _ = child.kill();
4023            let _ = child.wait();
4024            let _ = stdin_thread.join();
4025            Err(AppError::Validation(format!(
4026                "codex timed out after {timeout_secs} seconds"
4027            )))
4028        }
4029    }
4030}
4031
4032fn call_opencode(
4033    binary: &Path,
4034    prompt: &str,
4035    json_schema: &str,
4036    input_text: &str,
4037    model: Option<&str>,
4038    timeout_secs: u64,
4039) -> Result<(serde_json::Value, f64, bool), AppError> {
4040    use wait_timeout::ChildExt;
4041
4042    let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4043
4044    let augmented_prompt = if json_schema.is_empty() {
4045        prompt.to_string()
4046    } else {
4047        format!(
4048            "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4049             The JSON MUST match this schema:\n{json_schema}"
4050        )
4051    };
4052
4053    let mut cmd = super::opencode_runner::build_opencode_command_sync(
4054        binary,
4055        &resolved_model,
4056        &augmented_prompt,
4057        input_text,
4058    )?;
4059
4060    let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4061        AppError::Io(std::io::Error::new(
4062            e.kind(),
4063            format!("failed to spawn opencode: {e}"),
4064        ))
4065    })?;
4066
4067    let start = std::time::Instant::now();
4068    let timeout = std::time::Duration::from_secs(timeout_secs);
4069    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4070
4071    match status {
4072        Some(exit_status) => {
4073            tracing::debug!(
4074                target: "process",
4075                exit_code = ?exit_status.code(),
4076                elapsed_ms = start.elapsed().as_millis() as u64,
4077                "opencode process completed"
4078            );
4079
4080            let mut stdout_buf = Vec::new();
4081            if let Some(mut out) = child.stdout.take() {
4082                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4083            }
4084            if !exit_status.success() {
4085                let mut stderr_buf = Vec::new();
4086                if let Some(mut err) = child.stderr.take() {
4087                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4088                }
4089                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4090                tracing::warn!(
4091                    target: "enrich",
4092                    exit_code = ?exit_status.code(),
4093                    stderr = %stderr_str.trim(),
4094                    "opencode process failed"
4095                );
4096                return Err(AppError::Validation(format!(
4097                    "opencode exited with code {:?}: {}",
4098                    exit_status.code(),
4099                    stderr_str.trim()
4100                )));
4101            }
4102            let stdout_str = String::from_utf8(stdout_buf)
4103                .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4104            let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4105            let value: serde_json::Value =
4106                super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4107                    AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4108                })?;
4109            Ok((value, cost, false))
4110        }
4111        None => {
4112            let _ = child.kill();
4113            let _ = child.wait();
4114            Err(AppError::Validation(format!(
4115                "opencode timed out after {timeout_secs} seconds"
4116            )))
4117        }
4118    }
4119}
4120
4121// ---------------------------------------------------------------------------
4122// Tests
4123// ---------------------------------------------------------------------------
4124
4125#[cfg(test)]
4126mod tests {
4127    use super::*;
4128    use rusqlite::Connection;
4129    #[cfg(unix)]
4130    use std::os::unix::fs::PermissionsExt;
4131
4132    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
4133    fn open_test_db() -> Connection {
4134        let conn = Connection::open_in_memory().expect("in-memory db");
4135        conn.execute_batch(
4136            "CREATE TABLE memories (
4137                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4138                namespace   TEXT NOT NULL DEFAULT 'global',
4139                name        TEXT NOT NULL,
4140                type        TEXT NOT NULL DEFAULT 'note',
4141                description TEXT NOT NULL DEFAULT '',
4142                body        TEXT NOT NULL DEFAULT '',
4143                body_hash   TEXT NOT NULL DEFAULT '',
4144                session_id  TEXT,
4145                source      TEXT NOT NULL DEFAULT 'agent',
4146                metadata    TEXT NOT NULL DEFAULT '{}',
4147                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4148                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4149                deleted_at  INTEGER,
4150                UNIQUE(namespace, name)
4151            );
4152            CREATE TABLE entities (
4153                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4154                namespace   TEXT NOT NULL DEFAULT 'global',
4155                name        TEXT NOT NULL,
4156                type        TEXT NOT NULL DEFAULT 'concept',
4157                description TEXT,
4158                degree      INTEGER NOT NULL DEFAULT 0,
4159                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4160                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4161                UNIQUE(namespace, name)
4162            );
4163            CREATE TABLE memory_entities (
4164                memory_id  INTEGER NOT NULL,
4165                entity_id  INTEGER NOT NULL,
4166                PRIMARY KEY (memory_id, entity_id)
4167            );
4168            CREATE TABLE relationships (
4169                id         INTEGER PRIMARY KEY AUTOINCREMENT,
4170                namespace  TEXT NOT NULL DEFAULT 'global',
4171                source_id  INTEGER NOT NULL,
4172                target_id  INTEGER NOT NULL,
4173                relation   TEXT NOT NULL,
4174                weight     REAL NOT NULL DEFAULT 0.5,
4175                description TEXT,
4176                UNIQUE(source_id, target_id, relation)
4177            );
4178            CREATE TABLE memory_embeddings (
4179                memory_id   INTEGER PRIMARY KEY,
4180                namespace   TEXT NOT NULL,
4181                embedding   BLOB NOT NULL,
4182                source      TEXT NOT NULL,
4183                model       TEXT NOT NULL DEFAULT '',
4184                dim         INTEGER NOT NULL DEFAULT 384,
4185                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
4186            );",
4187        )
4188        .expect("schema creation must succeed");
4189        conn
4190    }
4191
4192    #[test]
4193    fn scan_unbound_memories_finds_memories_without_bindings() {
4194        let conn = open_test_db();
4195        conn.execute(
4196            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4197            [],
4198        )
4199        .unwrap();
4200
4201        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4202        assert_eq!(results.len(), 1);
4203        assert_eq!(results[0].1, "test-mem");
4204    }
4205
4206    #[test]
4207    fn scan_unbound_memories_excludes_bound_memories() {
4208        let conn = open_test_db();
4209        conn.execute(
4210            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4211            [],
4212        )
4213        .unwrap();
4214        let mem_id: i64 = conn
4215            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4216                r.get(0)
4217            })
4218            .unwrap();
4219        conn.execute(
4220            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4221            [],
4222        )
4223        .unwrap();
4224        let ent_id: i64 = conn
4225            .query_row(
4226                "SELECT id FROM entities WHERE name='some-entity'",
4227                [],
4228                |r| r.get(0),
4229            )
4230            .unwrap();
4231        conn.execute(
4232            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4233            rusqlite::params![mem_id, ent_id],
4234        )
4235        .unwrap();
4236
4237        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4238        assert!(results.is_empty(), "bound memory must not appear in scan");
4239    }
4240
4241    #[test]
4242    fn scan_entities_without_description_finds_null_description() {
4243        let conn = open_test_db();
4244        conn.execute(
4245            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4246            [],
4247        )
4248        .unwrap();
4249
4250        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4251        assert_eq!(results.len(), 1);
4252        assert_eq!(results[0].1, "my-tool");
4253    }
4254
4255    #[test]
4256    fn scan_entities_without_description_excludes_entities_with_description() {
4257        let conn = open_test_db();
4258        conn.execute(
4259            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4260            [],
4261        )
4262        .unwrap();
4263
4264        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4265        assert!(
4266            results.is_empty(),
4267            "entity with description must not appear"
4268        );
4269    }
4270
4271    #[test]
4272    fn scan_short_body_memories_finds_short_bodies() {
4273        let conn = open_test_db();
4274        conn.execute(
4275            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4276            [],
4277        )
4278        .unwrap();
4279
4280        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4281        assert_eq!(results.len(), 1);
4282        assert_eq!(results[0].1, "short-mem");
4283    }
4284
4285    #[test]
4286    fn scan_short_body_memories_excludes_long_bodies() {
4287        let conn = open_test_db();
4288        let long_body = "a".repeat(1000);
4289        conn.execute(
4290            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4291            rusqlite::params![long_body],
4292        )
4293        .unwrap();
4294
4295        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4296        assert!(results.is_empty(), "long memory must not appear in scan");
4297    }
4298
4299    #[test]
4300    fn scan_respects_limit() {
4301        let conn = open_test_db();
4302        for i in 0..5 {
4303            conn.execute(
4304                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4305                [],
4306            )
4307            .unwrap();
4308        }
4309
4310        let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4311        assert_eq!(results.len(), 3, "limit must be respected");
4312    }
4313
4314    #[test]
4315    fn scan_memories_without_embeddings_finds_only_missing_rows() {
4316        let conn = open_test_db();
4317        conn.execute(
4318            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4319            [],
4320        )
4321        .unwrap();
4322        conn.execute(
4323            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4324            [],
4325        )
4326        .unwrap();
4327        let memory_id: i64 = conn
4328            .query_row(
4329                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4330                [],
4331                |r| r.get(0),
4332            )
4333            .unwrap();
4334        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4335        memories::upsert_vec(
4336            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4337        )
4338        .unwrap();
4339
4340        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4341        assert_eq!(results.len(), 1);
4342        assert_eq!(results[0].1, "missing-vec");
4343    }
4344
4345    #[test]
4346    fn scan_memories_without_embeddings_respects_name_filter() {
4347        let conn = open_test_db();
4348        conn.execute(
4349            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4350            [],
4351        )
4352        .unwrap();
4353        conn.execute(
4354            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4355            [],
4356        )
4357        .unwrap();
4358
4359        let results =
4360            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4361                .unwrap();
4362        assert_eq!(results.len(), 1);
4363        assert_eq!(results[0].1, "match-me");
4364    }
4365
4366    #[test]
4367    fn queue_db_schema_creates_correctly() {
4368        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4369        let conn = open_queue_db(&tmp_path).expect("queue db must open");
4370        let count: i64 = conn
4371            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4372            .unwrap();
4373        assert_eq!(count, 0);
4374        let _ = std::fs::remove_file(&tmp_path);
4375    }
4376
4377    #[test]
4378    fn parse_claude_output_valid_bindings() {
4379        let output = r#"[
4380            {"type":"system","subtype":"init"},
4381            {"type":"result","is_error":false,"total_cost_usd":0.01,
4382             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4383        ]"#;
4384        let result = crate::commands::claude_runner::parse_claude_output(output)
4385            .expect("must parse successfully");
4386        assert!(result.value.get("entities").is_some());
4387        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4388        assert!(!result.is_oauth);
4389    }
4390
4391    #[test]
4392    fn parse_claude_output_detects_oauth() {
4393        let output = r#"[
4394            {"type":"system","subtype":"init","apiKeySource":"none"},
4395            {"type":"result","is_error":false,"total_cost_usd":0.0,
4396             "structured_output":{"entities":[],"relationships":[]}}
4397        ]"#;
4398        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4399        assert!(result.is_oauth);
4400    }
4401
4402    #[test]
4403    fn parse_claude_output_rate_limit_returns_error() {
4404        let output = r#"[
4405            {"type":"system","subtype":"init"},
4406            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4407        ]"#;
4408        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4409        assert!(matches!(err, AppError::RateLimited { .. }));
4410    }
4411
4412    #[test]
4413    fn parse_claude_output_auth_error() {
4414        let output = r#"[
4415            {"type":"system","subtype":"init"},
4416            {"type":"result","is_error":true,"error":"authentication failed"}
4417        ]"#;
4418        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4419        assert!(format!("{err}").contains("authentication failed"));
4420    }
4421
4422    #[cfg(unix)]
4423    #[test]
4424    fn call_codex_returns_raw_json_for_body_enrich_schema() {
4425        let tmp = tempfile::tempdir().expect("tempdir");
4426        let binary = tmp.path().join("codex-mock");
4427        std::fs::write(
4428            &binary,
4429            r#"#!/usr/bin/env bash
4430set -euo pipefail
4431cat <<'JSONL'
4432{"type":"thread.started","thread_id":"mock-thread-0"}
4433{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4434{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4435JSONL
4436"#,
4437        )
4438        .expect("mock codex write");
4439        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4440        perms.set_mode(0o755);
4441        std::fs::set_permissions(&binary, perms).expect("chmod");
4442
4443        let (value, cost, is_oauth) =
4444            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4445                .expect("call_codex must accept body-enrich payload");
4446
4447        assert_eq!(value["enriched_body"], "expanded body");
4448        assert_eq!(cost, 0.0);
4449        assert!(!is_oauth);
4450    }
4451
4452    #[test]
4453    fn dry_run_emits_preview_without_calling_llm() {
4454        // This test validates the dry-run NDJSON contract without spawning any process.
4455        // The scan_operation function requires a DB; we build one in-memory but cannot
4456        // call run() directly because it needs AppPaths (disk). Instead we test the
4457        // lower-level helpers that the dry-run path relies on.
4458        let conn = open_test_db();
4459        conn.execute(
4460            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4461            [],
4462        )
4463        .unwrap();
4464
4465        let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4466        assert_eq!(results.len(), 1);
4467        assert_eq!(results[0].1, "dry-mem");
4468        // If scan finds the item and dry_run is set, no LLM would be called.
4469        // The NDJSON emission is tested via integration tests with a fake binary.
4470    }
4471
4472    #[test]
4473    fn persist_entity_description_updates_db() {
4474        let conn = open_test_db();
4475        conn.execute(
4476            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4477            [],
4478        )
4479        .unwrap();
4480        let eid: i64 = conn
4481            .query_row(
4482                "SELECT id FROM entities WHERE name='tokio-runtime'",
4483                [],
4484                |r| r.get(0),
4485            )
4486            .unwrap();
4487
4488        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4489
4490        let desc: String = conn
4491            .query_row(
4492                "SELECT description FROM entities WHERE id=?1",
4493                rusqlite::params![eid],
4494                |r| r.get(0),
4495            )
4496            .unwrap();
4497        assert_eq!(desc, "Async runtime for Rust applications");
4498    }
4499
4500    #[test]
4501    fn bindings_schema_is_valid_json() {
4502        let _: serde_json::Value =
4503            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4504    }
4505
4506    #[test]
4507    fn entity_description_schema_is_valid_json() {
4508        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4509            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4510    }
4511
4512    #[test]
4513    fn body_enrich_schema_is_valid_json() {
4514        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4515            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4516    }
4517}