Skip to main content

sqlite_graphrag/commands/
enrich.rs

1// TODO v1.0.89: este arquivo tem 4116 linhas — modularização planejada.
2// Ver ADR-0046 seção "Known Tech Debt (v1.0.89+)".
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB (`.enrich-queue.sqlite`) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY opportunity
19//!
20//! `extract_with_claude`, `parse_claude_output`, `emit_json`, and the `open_queue_db`
21//! queue schema in `ingest_claude.rs` are private functions that duplicate patterns used
22//! here verbatim. A future refactoring could extract them into a shared
23//! `src/commands/llm_runner.rs` module (or `src/llm_runner.rs`) without changing any
24//! public APIs. That extraction requires editing `ingest_claude.rs`, which is outside
25//! this stream's boundary — flagged here for the Integration stream to evaluate.
26
27use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42// ---------------------------------------------------------------------------
43// Constants
44// ---------------------------------------------------------------------------
45
46const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51// ---------------------------------------------------------------------------
52// JSON schema used for memory-bindings and body-enrich extraction
53// ---------------------------------------------------------------------------
54
55const BINDINGS_SCHEMA: &str = r#"{
56  "type": "object",
57  "properties": {
58    "entities": {
59      "type": "array",
60      "items": {
61        "type": "object",
62        "properties": {
63          "name": { "type": "string" },
64          "entity_type": {
65            "type": "string",
66            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67          }
68        },
69        "required": ["name", "entity_type"],
70        "additionalProperties": false
71      }
72    },
73    "relationships": {
74      "type": "array",
75      "items": {
76        "type": "object",
77        "properties": {
78          "source": { "type": "string" },
79          "target": { "type": "string" },
80          "relation": {
81            "type": "string",
82            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83          },
84          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85        },
86        "required": ["source","target","relation","strength"],
87        "additionalProperties": false
88      }
89    }
90  },
91  "required": ["entities","relationships"],
92  "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96  "type": "object",
97  "properties": {
98    "description": { "type": "string" }
99  },
100  "required": ["description"],
101  "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105  "type": "object",
106  "properties": {
107    "enriched_body": { "type": "string" }
108  },
109  "required": ["enriched_body"],
110  "additionalProperties": false
111}"#;
112
113// G27 P1: weight-calibrate
114const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123  "type": "object",
124  "properties": {
125    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126    "reasoning": { "type": "string" }
127  },
128  "required": ["calibrated_weight", "reasoning"],
129  "additionalProperties": false
130}"#;
131
132// G27 P1: relation-reclassify
133const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149  "type": "object",
150  "properties": {
151    "relation": { "type": "string" },
152    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153    "reasoning": { "type": "string" }
154  },
155  "required": ["relation", "strength", "reasoning"],
156  "additionalProperties": false
157}"#;
158
159// G27 P2: entity-connect — suggest relationships between isolated entities
160const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166  "type": "object",
167  "properties": {
168    "relation": { "type": "string" },
169    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170    "reasoning": { "type": "string" }
171  },
172  "required": ["relation", "strength", "reasoning"],
173  "additionalProperties": false
174}"#;
175
176// G27 P2: entity-type-validate — verify entity type assignments
177const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183  "type": "object",
184  "properties": {
185    "validated_type": { "type": "string" },
186    "was_correct": { "type": "boolean" },
187    "reasoning": { "type": "string" }
188  },
189  "required": ["validated_type", "was_correct", "reasoning"],
190  "additionalProperties": false
191}"#;
192
193// G27 P2: description-enrich — improve generic memory descriptions
194const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200  "type": "object",
201  "properties": {
202    "description": { "type": "string" },
203    "reasoning": { "type": "string" }
204  },
205  "required": ["description", "reasoning"],
206  "additionalProperties": false
207}"#;
208
209// G27 P2: domain-classify — classify memory into domain category
210const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214  "type": "object",
215  "properties": {
216    "domain": { "type": "string" },
217    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218    "reasoning": { "type": "string" }
219  },
220  "required": ["domain", "confidence", "reasoning"],
221  "additionalProperties": false
222}"#;
223
224// G27 P2: graph-audit — audit graph for quality issues
225const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230  "type": "object",
231  "properties": {
232    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234    "reasoning": { "type": "string" }
235  },
236  "required": ["quality_score", "issues", "reasoning"],
237  "additionalProperties": false
238}"#;
239
240// G27 P2: deep-research-synth — synthesize research findings into graph
241const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247  "type": "object",
248  "properties": {
249    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251    "summary": { "type": "string" }
252  },
253  "required": ["entities", "relationships", "summary"],
254  "additionalProperties": false
255}"#;
256
257// G27 P2: body-extract — extract structured content from unstructured text
258const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263  "type": "object",
264  "properties": {
265    "restructured_body": { "type": "string" },
266    "changes_summary": { "type": "string" }
267  },
268  "required": ["restructured_body", "changes_summary"],
269  "additionalProperties": false
270}"#;
271
272// ---------------------------------------------------------------------------
273// Prompts
274// ---------------------------------------------------------------------------
275
276const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291// ---------------------------------------------------------------------------
292// CLI args
293// ---------------------------------------------------------------------------
294
295/// Operation to perform in the `enrich` command.
296#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299    /// Add missing entity/relationship bindings to memories (fully implemented).
300    MemoryBindings,
301    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
302    EntityDescriptions,
303    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
304    BodyEnrich,
305    /// Rebuild missing memory embeddings without rewriting the memory body.
306    ReEmbed,
307    /// Calibrate relationship weights using LLM analysis (scan only).
308    WeightCalibrate,
309    /// Reclassify relationship types using LLM judgment (scan only).
310    RelationReclassify,
311    /// Connect isolated entities by suggesting new relationships (scan only).
312    EntityConnect,
313    /// Validate entity type assignments using LLM judgment (scan only).
314    EntityTypeValidate,
315    /// Enrich memory descriptions that are generic/auto-generated (scan only).
316    DescriptionEnrich,
317    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
318    CrossDomainBridges,
319    /// Classify memories into domain categories (scan only).
320    DomainClassify,
321    /// Audit the graph for quality issues (scan only).
322    GraphAudit,
323    /// Synthesize deep-research findings into graph memories (scan only).
324    DeepResearchSynth,
325    /// Extract structured body from unstructured text (scan only).
326    BodyExtract,
327}
328
329/// LLM provider for enrichment.
330#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332    /// Use locally installed Claude Code CLI (OAuth-first).
333    ClaudeCode,
334    /// Use locally installed OpenAI Codex CLI.
335    Codex,
336    /// Use locally installed OpenCode CLI.
337    #[value(name = "opencode")]
338    Opencode,
339}
340
341impl std::fmt::Display for EnrichMode {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        match self {
344            EnrichMode::ClaudeCode => write!(f, "claude-code"),
345            EnrichMode::Codex => write!(f, "codex"),
346            EnrichMode::Opencode => write!(f, "opencode"),
347        }
348    }
349}
350
351/// Arguments for the `enrich` subcommand.
352#[derive(clap::Args)]
353#[command(
354    about = "Enrich graph memories and entities using an LLM provider",
355    after_long_help = "EXAMPLES:\n  \
356    # Add missing entity bindings to all unbound memories\n  \
357    sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n  \
358    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
359    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
360    # Expand short memory bodies (GAP-18)\n  \
361    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
362    # Rebuild only missing memory embeddings without rewriting bodies\n  \
363    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
364    # Resume an interrupted body-enrich run\n  \
365    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
366    # Retry only failed items from a previous run\n  \
367    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
368    EXIT CODES:\n  \
369    0  success\n  \
370    1  validation error (bad args, binary not found)\n  \
371    14 I/O error"
372)]
373pub struct EnrichArgs {
374    /// Enrichment operation to run.
375    #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
376    pub operation: EnrichOperation,
377
378    /// LLM provider to use. Default: claude-code (OAuth-first).
379    #[arg(long, value_enum, default_value = "claude-code")]
380    pub mode: EnrichMode,
381
382    /// Maximum number of items to process in this run. Omit for all.
383    #[arg(long, value_name = "N")]
384    pub limit: Option<usize>,
385
386    /// Preview items without calling the LLM (zero tokens consumed).
387    #[arg(long)]
388    pub dry_run: bool,
389
390    /// Namespace to operate on. Default: global.
391    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
392    pub namespace: Option<String>,
393
394    // -- Provider flags (Claude) --
395    /// Path to the Claude Code binary. Default: auto-detect from PATH.
396    #[arg(long, value_name = "PATH")]
397    pub claude_binary: Option<PathBuf>,
398
399    /// Claude model to use (e.g. claude-sonnet-4-6).
400    #[arg(long, value_name = "MODEL")]
401    pub claude_model: Option<String>,
402
403    /// Timeout per item in seconds when using Claude Code. Default: 300.
404    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
405    pub claude_timeout: u64,
406
407    // -- Provider flags (Codex) --
408    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
409    #[arg(long, value_name = "PATH")]
410    pub codex_binary: Option<PathBuf>,
411
412    /// Codex model to use (e.g. o4-mini).
413    #[arg(long, value_name = "MODEL")]
414    pub codex_model: Option<String>,
415
416    /// Timeout per item in seconds when using Codex. Default: 300.
417    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
418    pub codex_timeout: u64,
419
420    // -- Provider flags (OpenCode) --
421    /// Path to the OpenCode binary. Default: auto-detect from PATH.
422    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
423    pub opencode_binary: Option<PathBuf>,
424
425    /// OpenCode model to use.
426    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
427    pub opencode_model: Option<String>,
428
429    /// Timeout per item in seconds when using OpenCode. Default: 300.
430    #[arg(
431        long,
432        value_name = "SECONDS",
433        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
434        default_value_t = 300
435    )]
436    pub opencode_timeout: u64,
437
438    // -- Cost controls --
439    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
440    #[arg(long, value_name = "USD")]
441    pub max_cost_usd: Option<f64>,
442
443    // -- Queue controls --
444    /// Resume a previously interrupted run (skip already-done items).
445    #[arg(long)]
446    pub resume: bool,
447
448    /// Retry only items that failed in a previous run.
449    #[arg(long)]
450    pub retry_failed: bool,
451
452    // -- body-enrich specific flags (GAP-18) --
453    /// Minimum output character count for body-enrich. Default: 500.
454    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
455    pub min_output_chars: usize,
456
457    /// Maximum output character count for body-enrich. Default: 2000.
458    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
459    pub max_output_chars: usize,
460
461    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
462    #[arg(long, default_value_t = true)]
463    pub preserve_check: bool,
464
465    /// Path to a custom prompt template file for body-enrich.
466    #[arg(long, value_name = "PATH")]
467    pub prompt_template: Option<PathBuf>,
468
469    /// Number of parallel LLM workers (default 1 = serial).
470    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
471    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
472    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
473    pub llm_parallelism: u32,
474
475    // -- Output / infra --
476    /// Emit NDJSON output. Always true; flag accepted for compatibility.
477    #[arg(long)]
478    pub json: bool,
479
480    /// Database path override.
481    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
482    pub db: Option<String>,
483
484    /// G30: poll for the job singleton every second for up to N seconds
485    /// when another invocation holds the lock. Default: 0 (fail fast).
486    #[arg(long, value_name = "SECONDS")]
487    pub wait_job_singleton: Option<u64>,
488
489    /// G30: force acquisition of the singleton lock by removing a stale
490    /// lock file from a previously crashed invocation. Use only when you
491    /// are certain no other `enrich`/`ingest` is running.
492    #[arg(long, default_value_t = false)]
493    pub force_job_singleton: bool,
494
495    /// G37: select a specific subset of memory names to enrich instead of
496    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
497    /// Empty when omitted (processes all candidates).
498    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
499    pub names: Vec<String>,
500
501    /// G37: read the subset of memory names from a file (one per line).
502    /// Lines starting with `#` and empty lines are ignored. Combined with
503    /// `--names` (union) when both are set.
504    #[arg(long, value_name = "PATH")]
505    pub names_file: Option<PathBuf>,
506
507    /// G35: probe the LLM provider with a 1-turn ping before processing
508    /// the batch. Aborts with a clear error if the rate-limit window is
509    /// closed (avoids burning N turns only to fail on item 1).
510    #[arg(long, default_value_t = false)]
511    pub preflight_check: bool,
512
513    /// G35: if a preflight probe or in-flight call hits the Claude rate
514    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
515    /// of failing the batch. Ignored when `--mode` is already `codex`.
516    #[arg(long, value_enum)]
517    pub fallback_mode: Option<EnrichMode>,
518
519    /// G35: number of seconds before the OAuth rate-limit reset at which
520    /// the preflight probe should refuse to start. Default 300 (5 min).
521    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
522    pub rate_limit_buffer: u64,
523
524    /// G28-D: refuse to start when the 1-minute load average exceeds
525    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
526    /// Set to false to skip the check on contended CI runners.
527    #[arg(long, default_value_t = true)]
528    pub max_load_check: bool,
529
530    /// G28-D: when the system is saturated, abort the job after this
531    /// many consecutive HardFailure outcomes. Default 5.
532    #[arg(long, value_name = "N", default_value_t = 5)]
533    pub circuit_breaker_threshold: u32,
534
535    /// G29 Passo 4: minimum trigram-Jaccard similarity between the
536    /// original body and the LLM-rewritten body for the rewrite to be
537    /// accepted. Scores below the threshold are rejected and emitted as
538    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
539    /// gap specification). Ignored when `--operation` is not
540    /// `body-enrich`.
541    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
542    pub preserve_threshold: f64,
543
544    /// G33 Passo 3: when set, validate `--codex-model` against the
545    /// ChatGPT Pro OAuth accepted-model list and abort with a
546    /// suggestion when the value is unknown. Default true (fail fast
547    /// to avoid burning OAuth turns). Set to false to opt out.
548    #[arg(long, default_value_t = true)]
549    pub codex_model_validate: bool,
550
551    /// G33 Passo 3: when set together with an invalid `--codex-model`,
552    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
553    /// instead of aborting. The substitution is recorded in the NDJSON
554    /// stream as `provider_substituted: true` for traceability.
555    #[arg(long, value_name = "MODEL")]
556    pub codex_model_fallback: Option<String>,
557}
558
559// ---------------------------------------------------------------------------
560// Internal types — raw LLM output structs
561// ---------------------------------------------------------------------------
562
563// ---------------------------------------------------------------------------
564// NDJSON event types emitted to stdout
565// ---------------------------------------------------------------------------
566
567#[derive(Debug, Serialize)]
568struct PhaseEvent<'a> {
569    phase: &'a str,
570    #[serde(skip_serializing_if = "Option::is_none")]
571    binary_path: Option<&'a str>,
572    #[serde(skip_serializing_if = "Option::is_none")]
573    version: Option<&'a str>,
574    #[serde(skip_serializing_if = "Option::is_none")]
575    items_total: Option<usize>,
576    #[serde(skip_serializing_if = "Option::is_none")]
577    items_pending: Option<usize>,
578    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
579    #[serde(skip_serializing_if = "Option::is_none")]
580    llm_parallelism: Option<u32>,
581}
582
583#[derive(Debug, Serialize)]
584struct ItemEvent<'a> {
585    /// Item identifier (memory name or entity name).
586    item: &'a str,
587    status: &'a str,
588    #[serde(skip_serializing_if = "Option::is_none")]
589    memory_id: Option<i64>,
590    #[serde(skip_serializing_if = "Option::is_none")]
591    entity_id: Option<i64>,
592    #[serde(skip_serializing_if = "Option::is_none")]
593    entities: Option<usize>,
594    #[serde(skip_serializing_if = "Option::is_none")]
595    rels: Option<usize>,
596    #[serde(skip_serializing_if = "Option::is_none")]
597    chars_before: Option<usize>,
598    #[serde(skip_serializing_if = "Option::is_none")]
599    chars_after: Option<usize>,
600    #[serde(skip_serializing_if = "Option::is_none")]
601    cost_usd: Option<f64>,
602    #[serde(skip_serializing_if = "Option::is_none")]
603    elapsed_ms: Option<u64>,
604    #[serde(skip_serializing_if = "Option::is_none")]
605    error: Option<String>,
606    index: usize,
607    total: usize,
608}
609
610#[derive(Debug, Serialize)]
611struct EnrichSummary {
612    summary: bool,
613    operation: String,
614    items_total: usize,
615    completed: usize,
616    failed: usize,
617    skipped: usize,
618    cost_usd: f64,
619    elapsed_ms: u64,
620    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
621    /// executou o re-embedding durante o enrich. `"claude" | "codex" | "none"`.
622    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
623    /// ou quando a operação não envolveu re-embed).
624    #[serde(skip_serializing_if = "Option::is_none")]
625    backend_invoked: Option<&'static str>,
626}
627
628use crate::output::emit_json_line as emit_json;
629
630// ---------------------------------------------------------------------------
631// Queue DB
632// ---------------------------------------------------------------------------
633
634/// Opens or creates the enrichment queue database.
635///
636/// The queue schema mirrors `ingest_claude` for resume/retry parity.
637/// Uses a different filename (`.enrich-queue.sqlite`) to avoid collision.
638///
639/// # DRY note
640///
641/// This is a near-verbatim copy of `open_queue_db` in `ingest_claude.rs`.
642/// Both should be unified in a shared `llm_runner.rs` module by the
643/// Integration stream.
644fn open_queue_db(path: &str) -> Result<Connection, AppError> {
645    let conn = Connection::open(path)?;
646    conn.pragma_update(None, "journal_mode", "wal")?;
647    conn.execute_batch(
648        "CREATE TABLE IF NOT EXISTS queue (
649            id          INTEGER PRIMARY KEY AUTOINCREMENT,
650            item_key    TEXT NOT NULL UNIQUE,
651            item_type   TEXT NOT NULL DEFAULT 'memory',
652            status      TEXT NOT NULL DEFAULT 'pending',
653            memory_id   INTEGER,
654            entity_id   INTEGER,
655            entities    INTEGER DEFAULT 0,
656            rels        INTEGER DEFAULT 0,
657            error       TEXT,
658            cost_usd    REAL DEFAULT 0.0,
659            attempt     INTEGER DEFAULT 0,
660            elapsed_ms  INTEGER,
661            created_at  TEXT DEFAULT (datetime('now')),
662            done_at     TEXT
663        );
664        CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
665    )?;
666    Ok(conn)
667}
668
669// ---------------------------------------------------------------------------
670// LLM invocation — Claude Code
671// ---------------------------------------------------------------------------
672
673/// Calls `claude -p` via the shared `claude_runner` module (G02).
674///
675/// Returns `(output_value, cost_usd, is_oauth)`.
676fn call_claude(
677    binary: &Path,
678    prompt: &str,
679    json_schema: &str,
680    input_text: &str,
681    model: Option<&str>,
682    timeout_secs: u64,
683) -> Result<(serde_json::Value, f64, bool), AppError> {
684    let result = crate::commands::claude_runner::run_claude(
685        binary,
686        prompt,
687        json_schema,
688        input_text,
689        model,
690        timeout_secs,
691        7,
692    )?;
693    Ok((result.value, result.cost_usd, result.is_oauth))
694}
695
696// ---------------------------------------------------------------------------
697// Preflight probe (G35) — single-turn ping to verify the LLM provider
698// ---------------------------------------------------------------------------
699
700/// Result of a single preflight ping (G35).
701enum PreflightOutcome {
702    /// The provider accepted the ping without rate-limit or other errors.
703    Healthy,
704    /// The provider rejected the ping due to OAuth rate limit. The
705    /// `suggestion` field is a human hint that callers can embed in the
706    /// user-facing error.
707    RateLimited {
708        reason: String,
709        suggestion: &'static str,
710    },
711    /// Any other provider error (binary missing, auth failure, etc.).
712    Error(AppError),
713}
714
715/// Probes the configured LLM provider with a 1-turn ping.
716///
717/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
718/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
719///
720/// The probe intentionally avoids spawning any MCP server children (G28-A)
721/// to keep its own process footprint at the minimum.
722fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
723    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
724
725    match args.mode {
726        EnrichMode::ClaudeCode => {
727            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
728                Ok(b) => b,
729                Err(e) => return PreflightOutcome::Error(e),
730            };
731            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
732            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
733            // form) and run the preflight gate before spawn, mirroring
734            // the canonical pattern in `claude_runner::build_claude_command`.
735            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
736                Ok(p) => p,
737                Err(e) => {
738                    return PreflightOutcome::Error(AppError::Io(e));
739                }
740            };
741            let mut cmd = std::process::Command::new(&bin);
742            crate::spawn::env_whitelist::apply_env_whitelist(
743                &mut cmd,
744                crate::spawn::env_whitelist::is_strict_env_clear(),
745            );
746            if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
747                return PreflightOutcome::Error(e);
748            }
749            cmd.arg("-p")
750                .arg("ping")
751                .arg("--max-turns")
752                .arg("1")
753                .arg("--strict-mcp-config")
754                .arg("--mcp-config")
755                .arg(mcp_config_path.as_os_str())
756                .arg("--dangerously-skip-permissions")
757                .arg("--settings")
758                .arg("{\"hooks\":{}}")
759                .arg("--output-format")
760                .arg("json")
761                .stdin(std::process::Stdio::null())
762                .stdout(std::process::Stdio::piped())
763                .stderr(std::process::Stdio::piped());
764
765            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
766                Ok(c) => c,
767                Err(e) => {
768                    return PreflightOutcome::Error(AppError::Io(e));
769                }
770            };
771            let output = match wait_with_timeout(child, timeout) {
772                Ok(out) => out,
773                Err(e) => return PreflightOutcome::Error(e),
774            };
775            if !output.status.success() {
776                let stderr = String::from_utf8_lossy(&output.stderr);
777                if stderr.contains("hit your session limit")
778                    || stderr.contains("rate_limit")
779                    || stderr.contains("429")
780                {
781                    return PreflightOutcome::RateLimited {
782                        reason: stderr.trim().to_string(),
783                        suggestion:
784                            "wait for the OAuth window to reset or use --fallback-mode codex",
785                    };
786                }
787                return PreflightOutcome::Error(AppError::Validation(format!(
788                    "preflight probe failed: {stderr}",
789                    stderr = stderr.trim()
790                )));
791            }
792            PreflightOutcome::Healthy
793        }
794        EnrichMode::Codex => {
795            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
796                Ok(b) => b,
797                Err(e) => return PreflightOutcome::Error(e),
798            };
799            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
800                .map_err(PreflightOutcome::Error)
801                .ok();
802            let schema = "{}";
803            let schema_path = match super::codex_spawn::trusted_schema_path() {
804                Ok(p) => p,
805                Err(e) => return PreflightOutcome::Error(e),
806            };
807            let spawn_args = super::codex_spawn::CodexSpawnArgs {
808                binary: &bin,
809                prompt: "ping",
810                json_schema: schema,
811                input_text: "",
812                model: args.codex_model.as_deref(),
813                timeout_secs: args.rate_limit_buffer.max(60),
814                schema_path: schema_path.clone(),
815            };
816            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
817                Ok(c) => c,
818                Err(e) => return PreflightOutcome::Error(e),
819            };
820            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
821                Ok(c) => c,
822                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
823            };
824            let output = match wait_with_timeout(child, timeout) {
825                Ok(out) => out,
826                Err(e) => return PreflightOutcome::Error(e),
827            };
828            let _ = std::fs::remove_file(&schema_path);
829            if !output.status.success() {
830                let stderr = String::from_utf8_lossy(&output.stderr);
831                if stderr.contains("rate_limit")
832                    || stderr.contains("429")
833                    || stderr.contains("Too Many Requests")
834                {
835                    return PreflightOutcome::RateLimited {
836                        reason: stderr.trim().to_string(),
837                        suggestion: "wait for the rate-limit window to reset",
838                    };
839                }
840                return PreflightOutcome::Error(AppError::Validation(format!(
841                    "preflight probe failed: {stderr}",
842                    stderr = stderr.trim()
843                )));
844            }
845            PreflightOutcome::Healthy
846        }
847        EnrichMode::Opencode => {
848            let bin = match super::opencode_runner::find_opencode_binary_with_override(
849                args.opencode_binary.as_deref(),
850            ) {
851                Ok(b) => b,
852                Err(e) => return PreflightOutcome::Error(e),
853            };
854            let model =
855                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
856            let mut cmd =
857                match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
858                {
859                    Ok(c) => c,
860                    Err(e) => return PreflightOutcome::Error(e),
861                };
862            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
863                Ok(c) => c,
864                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
865            };
866            let output = match wait_with_timeout(child, timeout) {
867                Ok(out) => out,
868                Err(e) => return PreflightOutcome::Error(e),
869            };
870            if !output.status.success() {
871                let stderr = String::from_utf8_lossy(&output.stderr);
872                if stderr.contains("rate_limit")
873                    || stderr.contains("429")
874                    || stderr.contains("Too Many Requests")
875                {
876                    return PreflightOutcome::RateLimited {
877                        reason: stderr.trim().to_string(),
878                        suggestion: "wait for the rate-limit window to reset",
879                    };
880                }
881                return PreflightOutcome::Error(AppError::Validation(format!(
882                    "preflight probe failed: {stderr}",
883                    stderr = stderr.trim()
884                )));
885            }
886            PreflightOutcome::Healthy
887        }
888    }
889}
890
891/// Cross-platform wait with timeout (no extra crate dependency).
892fn wait_with_timeout(
893    mut child: std::process::Child,
894    timeout: std::time::Duration,
895) -> Result<std::process::Output, AppError> {
896    use wait_timeout::ChildExt;
897    let start = std::time::Instant::now();
898    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
899    if status.is_none() {
900        let _ = child.kill();
901        let _ = child.wait();
902        return Err(AppError::Validation(format!(
903            "preflight probe timed out after {}s",
904            start.elapsed().as_secs()
905        )));
906    }
907    let mut stdout = Vec::new();
908    if let Some(mut out) = child.stdout.take() {
909        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
910    }
911    let mut stderr = Vec::new();
912    if let Some(mut err) = child.stderr.take() {
913        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
914    }
915    let exit = status.unwrap();
916    Ok(std::process::Output {
917        status: exit,
918        stdout,
919        stderr,
920    })
921}
922
923// ---------------------------------------------------------------------------
924// SCAN helpers — SQL queries that find items needing enrichment
925// ---------------------------------------------------------------------------
926
927/// Returns memories without any `memory_entities` binding.
928///
929/// These are the targets for `memory-bindings` enrichment. When `name_filter`
930/// is non-empty, restricts the scan to the given names (G37); unknown names
931/// are silently skipped (the caller can detect them by comparing
932/// requested vs. returned).
933fn scan_unbound_memories(
934    conn: &Connection,
935    namespace: &str,
936    limit: Option<usize>,
937    name_filter: &[String],
938) -> Result<Vec<(i64, String, String)>, AppError> {
939    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
940
941    if name_filter.is_empty() {
942        let sql = format!(
943            "SELECT m.id, m.name, m.body
944             FROM memories m
945             WHERE m.namespace = ?1
946               AND m.deleted_at IS NULL
947               AND NOT EXISTS (
948                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
949               )
950             ORDER BY m.id
951             {limit_clause}"
952        );
953        let mut stmt = conn.prepare(&sql)?;
954        let rows = stmt
955            .query_map(rusqlite::params![namespace], |r| {
956                Ok((
957                    r.get::<_, i64>(0)?,
958                    r.get::<_, String>(1)?,
959                    r.get::<_, String>(2)?,
960                ))
961            })?
962            .collect::<Result<Vec<_>, _>>()?;
963        Ok(rows)
964    } else {
965        // Build a parameterised IN clause: ?2, ?3, ..., ?{1+n}
966        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
967            .map(|i| format!("?{i}"))
968            .collect();
969        let in_clause = placeholders.join(", ");
970        let sql = format!(
971            "SELECT m.id, m.name, m.body
972             FROM memories m
973             WHERE m.namespace = ?1
974               AND m.deleted_at IS NULL
975               AND m.name IN ({in_clause})
976               AND NOT EXISTS (
977                   SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
978               )
979             ORDER BY m.id
980             {limit_clause}"
981        );
982        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
983        params_vec.push(&namespace);
984        for n in name_filter {
985            params_vec.push(n);
986        }
987        let mut stmt = conn.prepare(&sql)?;
988        let rows = stmt
989            .query_map(
990                rusqlite::params_from_iter(params_vec.iter().copied()),
991                |r| {
992                    Ok((
993                        r.get::<_, i64>(0)?,
994                        r.get::<_, String>(1)?,
995                        r.get::<_, String>(2)?,
996                    ))
997                },
998            )?
999            .collect::<Result<Vec<_>, _>>()?;
1000        Ok(rows)
1001    }
1002}
1003
1004/// Reads a list of memory names from a UTF-8 text file (G37).
1005///
1006/// Empty lines and lines beginning with `#` are skipped. Returns a
1007/// de-duplicated, order-preserving list of trimmed names.
1008fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1009    let content = std::fs::read_to_string(path).map_err(|e| {
1010        AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1011    })?;
1012    let mut seen = std::collections::HashSet::new();
1013    let mut out = Vec::new();
1014    for line in content.lines() {
1015        let trimmed = line.trim();
1016        if trimmed.is_empty() || trimmed.starts_with('#') {
1017            continue;
1018        }
1019        if seen.insert(trimmed.to_string()) {
1020            out.push(trimmed.to_string());
1021        }
1022    }
1023    Ok(out)
1024}
1025
1026/// Resolves the union of `--names` and `--names-file` (G37).
1027fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1028    let mut combined: Vec<String> = args.names.clone();
1029    if let Some(p) = &args.names_file {
1030        let from_file = read_names_file(p)?;
1031        for n in from_file {
1032            if !combined.contains(&n) {
1033                combined.push(n);
1034            }
1035        }
1036    }
1037    Ok(combined)
1038}
1039
1040/// Returns entities with NULL or empty description.
1041///
1042/// These are the targets for `entity-descriptions` enrichment.
1043fn scan_entities_without_description(
1044    conn: &Connection,
1045    namespace: &str,
1046    limit: Option<usize>,
1047    name_filter: &[String],
1048) -> Result<Vec<(i64, String, String)>, AppError> {
1049    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1050
1051    if name_filter.is_empty() {
1052        let sql = format!(
1053            "SELECT id, name, type
1054             FROM entities
1055             WHERE namespace = ?1
1056               AND (description IS NULL OR description = '')
1057             ORDER BY id
1058             {limit_clause}"
1059        );
1060        let mut stmt = conn.prepare(&sql)?;
1061        let rows = stmt
1062            .query_map(rusqlite::params![namespace], |r| {
1063                Ok((
1064                    r.get::<_, i64>(0)?,
1065                    r.get::<_, String>(1)?,
1066                    r.get::<_, String>(2)?,
1067                ))
1068            })?
1069            .collect::<Result<Vec<_>, _>>()?;
1070        Ok(rows)
1071    } else {
1072        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1073            .map(|i| format!("?{i}"))
1074            .collect();
1075        let in_clause = placeholders.join(", ");
1076        let sql = format!(
1077            "SELECT id, name, type
1078             FROM entities
1079             WHERE namespace = ?1
1080               AND name IN ({in_clause})
1081               AND (description IS NULL OR description = '')
1082             ORDER BY id
1083             {limit_clause}"
1084        );
1085        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1086        params_vec.push(&namespace);
1087        for n in name_filter {
1088            params_vec.push(n);
1089        }
1090        let mut stmt = conn.prepare(&sql)?;
1091        let rows = stmt
1092            .query_map(
1093                rusqlite::params_from_iter(params_vec.iter().copied()),
1094                |r| {
1095                    Ok((
1096                        r.get::<_, i64>(0)?,
1097                        r.get::<_, String>(1)?,
1098                        r.get::<_, String>(2)?,
1099                    ))
1100                },
1101            )?
1102            .collect::<Result<Vec<_>, _>>()?;
1103        Ok(rows)
1104    }
1105}
1106
1107/// Returns memories whose body length is below the configured minimum.
1108///
1109/// These are the targets for `body-enrich` (GAP-18).
1110fn scan_short_body_memories(
1111    conn: &Connection,
1112    namespace: &str,
1113    min_chars: usize,
1114    limit: Option<usize>,
1115    name_filter: &[String],
1116) -> Result<Vec<(i64, String, String)>, AppError> {
1117    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1118
1119    if name_filter.is_empty() {
1120        let sql = format!(
1121            "SELECT m.id, m.name, m.body
1122             FROM memories m
1123             WHERE m.namespace = ?1
1124               AND m.deleted_at IS NULL
1125               AND LENGTH(COALESCE(m.body,'')) < ?2
1126             ORDER BY m.id
1127             {limit_clause}"
1128        );
1129        let mut stmt = conn.prepare(&sql)?;
1130        let rows = stmt
1131            .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1132                Ok((
1133                    r.get::<_, i64>(0)?,
1134                    r.get::<_, String>(1)?,
1135                    r.get::<_, String>(2)?,
1136                ))
1137            })?
1138            .collect::<Result<Vec<_>, _>>()?;
1139        Ok(rows)
1140    } else {
1141        let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1142            .map(|i| format!("?{i}"))
1143            .collect();
1144        let in_clause = placeholders.join(", ");
1145        let sql = format!(
1146            "SELECT m.id, m.name, m.body
1147             FROM memories m
1148             WHERE m.namespace = ?1
1149               AND m.deleted_at IS NULL
1150               AND m.name IN ({in_clause})
1151               AND LENGTH(COALESCE(m.body,'')) < ?2
1152             ORDER BY m.id
1153             {limit_clause}"
1154        );
1155        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1156        let min_chars_i64 = min_chars as i64;
1157        params_vec.push(&namespace);
1158        params_vec.push(&min_chars_i64);
1159        for n in name_filter {
1160            params_vec.push(n);
1161        }
1162        let mut stmt = conn.prepare(&sql)?;
1163        let rows = stmt
1164            .query_map(
1165                rusqlite::params_from_iter(params_vec.iter().copied()),
1166                |r| {
1167                    Ok((
1168                        r.get::<_, i64>(0)?,
1169                        r.get::<_, String>(1)?,
1170                        r.get::<_, String>(2)?,
1171                    ))
1172                },
1173            )?
1174            .collect::<Result<Vec<_>, _>>()?;
1175        Ok(rows)
1176    }
1177}
1178
1179/// Returns live memories that still have no row in `memory_embeddings`.
1180///
1181/// These are the targets for `re-embed`.
1182fn scan_memories_without_embeddings(
1183    conn: &Connection,
1184    namespace: &str,
1185    limit: Option<usize>,
1186    name_filter: &[String],
1187) -> Result<Vec<(i64, String, String)>, AppError> {
1188    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1189
1190    if name_filter.is_empty() {
1191        let sql = format!(
1192            "SELECT m.id, m.name, COALESCE(m.body,'')
1193             FROM memories m
1194             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1195             WHERE m.namespace = ?1
1196               AND m.deleted_at IS NULL
1197               AND me.memory_id IS NULL
1198             ORDER BY m.id
1199             {limit_clause}"
1200        );
1201        let mut stmt = conn.prepare(&sql)?;
1202        let rows = stmt
1203            .query_map(rusqlite::params![namespace], |r| {
1204                Ok((
1205                    r.get::<_, i64>(0)?,
1206                    r.get::<_, String>(1)?,
1207                    r.get::<_, String>(2)?,
1208                ))
1209            })?
1210            .collect::<Result<Vec<_>, _>>()?;
1211        Ok(rows)
1212    } else {
1213        let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1214            .map(|i| format!("?{i}"))
1215            .collect();
1216        let in_clause = placeholders.join(", ");
1217        let sql = format!(
1218            "SELECT m.id, m.name, COALESCE(m.body,'')
1219             FROM memories m
1220             LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1221             WHERE m.namespace = ?1
1222               AND m.deleted_at IS NULL
1223               AND m.name IN ({in_clause})
1224               AND me.memory_id IS NULL
1225             ORDER BY m.id
1226             {limit_clause}"
1227        );
1228        let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1229        params_vec.push(&namespace);
1230        for n in name_filter {
1231            params_vec.push(n);
1232        }
1233        let mut stmt = conn.prepare(&sql)?;
1234        let rows = stmt
1235            .query_map(
1236                rusqlite::params_from_iter(params_vec.iter().copied()),
1237                |r| {
1238                    Ok((
1239                        r.get::<_, i64>(0)?,
1240                        r.get::<_, String>(1)?,
1241                        r.get::<_, String>(2)?,
1242                    ))
1243                },
1244            )?
1245            .collect::<Result<Vec<_>, _>>()?;
1246        Ok(rows)
1247    }
1248}
1249
1250/// G27: Returns relationships with weight >= 0.7 that may need recalibration.
1251#[allow(clippy::type_complexity)]
1252fn scan_weight_candidates(
1253    conn: &Connection,
1254    namespace: &str,
1255    limit: Option<usize>,
1256) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1257    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1258    let sql = format!(
1259        "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1260         FROM relationships r \
1261         JOIN entities e1 ON e1.id = r.source_id \
1262         JOIN entities e2 ON e2.id = r.target_id \
1263         WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1264         ORDER BY r.weight DESC {limit_clause}"
1265    );
1266    let mut stmt = conn.prepare(&sql)?;
1267    let rows = stmt
1268        .query_map(rusqlite::params![namespace], |r| {
1269            Ok((
1270                r.get::<_, i64>(0)?,
1271                r.get::<_, String>(1)?,
1272                r.get::<_, String>(2)?,
1273                r.get::<_, String>(3)?,
1274                r.get::<_, f64>(4)?,
1275            ))
1276        })?
1277        .collect::<Result<Vec<_>, _>>()?;
1278    Ok(rows)
1279}
1280
1281/// G27: Returns relationships with generic relation types (applies_to).
1282fn scan_generic_relations(
1283    conn: &Connection,
1284    namespace: &str,
1285    limit: Option<usize>,
1286) -> Result<Vec<(i64, String, String, String)>, AppError> {
1287    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1288    let sql = format!(
1289        "SELECT r.id, e1.name, e2.name, r.relation \
1290         FROM relationships r \
1291         JOIN entities e1 ON e1.id = r.source_id \
1292         JOIN entities e2 ON e2.id = r.target_id \
1293         WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1294         ORDER BY r.id {limit_clause}"
1295    );
1296    let mut stmt = conn.prepare(&sql)?;
1297    let rows = stmt
1298        .query_map(rusqlite::params![namespace], |r| {
1299            Ok((
1300                r.get::<_, i64>(0)?,
1301                r.get::<_, String>(1)?,
1302                r.get::<_, String>(2)?,
1303                r.get::<_, String>(3)?,
1304            ))
1305        })?
1306        .collect::<Result<Vec<_>, _>>()?;
1307    Ok(rows)
1308}
1309
1310// ---------------------------------------------------------------------------
1311// PERSIST helpers for fully-implemented operations
1312// ---------------------------------------------------------------------------
1313
1314/// Persists entity bindings extracted by the LLM for a memory.
1315///
1316/// Creates entities via `upsert_entity`, links them to the memory via
1317/// `link_memory_entity`, and upserts relationships found between entities.
1318fn persist_memory_bindings(
1319    conn: &Connection,
1320    namespace: &str,
1321    memory_id: i64,
1322    entities_json: &serde_json::Value,
1323    rels_json: &serde_json::Value,
1324) -> Result<(usize, usize), AppError> {
1325    #[derive(Deserialize)]
1326    struct EntityItem {
1327        name: String,
1328        entity_type: String,
1329    }
1330    #[derive(Deserialize)]
1331    struct RelItem {
1332        source: String,
1333        target: String,
1334        relation: String,
1335        strength: f64,
1336    }
1337
1338    let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1339        .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1340
1341    let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1342        .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1343
1344    let mut ent_count = 0usize;
1345    let mut rel_count = 0usize;
1346
1347    for item in &extracted_entities {
1348        let entity_type = match item.entity_type.parse::<EntityType>() {
1349            Ok(et) => et,
1350            Err(_) => {
1351                tracing::warn!(
1352                    target: "enrich",
1353                    entity = %item.name,
1354                    entity_type = %item.entity_type,
1355                    "entity type not recognized, skipping"
1356                );
1357                continue;
1358            }
1359        };
1360        match entities::upsert_entity(
1361            conn,
1362            namespace,
1363            &NewEntity {
1364                name: item.name.clone(),
1365                entity_type,
1366                description: None,
1367            },
1368        ) {
1369            Ok(eid) => {
1370                let _ = entities::link_memory_entity(conn, memory_id, eid);
1371                ent_count += 1;
1372            }
1373            Err(e) => {
1374                tracing::warn!(
1375                    target: "enrich",
1376                    entity = %item.name,
1377                    error = %e,
1378                    "entity upsert skipped"
1379                );
1380            }
1381        }
1382    }
1383
1384    for rel in &extracted_rels {
1385        let normalized = crate::parsers::normalize_relation(&rel.relation);
1386        crate::parsers::warn_if_non_canonical(&normalized);
1387
1388        // Normalize entity names before lookup: upsert_entity normalizes on write,
1389        // so the lookup must use the same normalized form to find the row.
1390        let src_name = crate::parsers::normalize_entity_name(&rel.source);
1391        let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1392        let src_id = entities::find_entity_id(conn, namespace, &src_name);
1393        let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1394        if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1395            let new_rel = NewRelationship {
1396                source: rel.source.clone(),
1397                target: rel.target.clone(),
1398                relation: normalized,
1399                strength: rel.strength,
1400                description: None,
1401            };
1402            if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1403                rel_count += 1;
1404            }
1405        }
1406    }
1407
1408    Ok((ent_count, rel_count))
1409}
1410
1411/// Updates an entity's description directly in the `entities` table.
1412fn persist_entity_description(
1413    conn: &Connection,
1414    entity_id: i64,
1415    description: &str,
1416) -> Result<(), AppError> {
1417    conn.execute(
1418        "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1419        rusqlite::params![description, entity_id],
1420    )?;
1421    Ok(())
1422}
1423
1424/// v1.0.84 (ADR-0042): on successful re-embed, records the active backend
1425/// into the shared accumulator (`ENRICH_LAST_BACKEND`) so the final
1426/// `EnrichSummary` can expose `backend_invoked` without changing every
1427/// caller's signature. Best-effort observability — concurrent enrich runs
1428/// may race, but `Mutex` keeps the mutation safe.
1429#[allow(clippy::too_many_arguments)]
1430fn reembed_memory_vector(
1431    conn: &Connection,
1432    namespace: &str,
1433    memory_id: i64,
1434    memory_name: &str,
1435    memory_type: &str,
1436    body: &str,
1437    paths: &crate::paths::AppPaths,
1438    llm_backend: crate::cli::LlmBackendChoice,
1439    embedding_backend: crate::cli::EmbeddingBackendChoice,
1440) -> Result<(), AppError> {
1441    let snippet: String = body.chars().take(200).collect();
1442    // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
1443    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
1444    // backend que efetivamente rodou e popula o accumulator para o
1445    // EnrichSummary agregado.
1446    // v1.0.93 (GAP-OR-PROPAGATION): honour --embedding-backend openrouter.
1447    let (embedding, backend_kind) = crate::embedder::embed_passage_with_embedding_choice(
1448        &paths.models,
1449        body,
1450        embedding_backend,
1451        llm_backend,
1452    )?;
1453    record_enrich_backend(backend_kind.as_str());
1454    memories::upsert_vec(
1455        conn,
1456        memory_id,
1457        namespace,
1458        memory_type,
1459        &embedding,
1460        memory_name,
1461        &snippet,
1462    )?;
1463    Ok(())
1464}
1465
1466/// v1.0.84 (ADR-0042): process-local accumulator of the last LLM backend
1467/// that successfully ran a re-embed during the current enrich invocation.
1468/// Read by `run` once at summary emission. Scoped to a single process —
1469/// cross-process enrichment is gated by the per-namespace singleton, so
1470/// there is no concurrency hazard across DBs.
1471fn record_enrich_backend(backend: &'static str) {
1472    if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1473        *guard = Some(backend);
1474    }
1475}
1476
1477fn take_enrich_backend() -> Option<&'static str> {
1478    ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1479}
1480
1481static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1482
1483/// Persists an enriched memory body (body-enrich, GAP-18).
1484///
1485/// Uses `memories::update` to set the new body and `sync_fts_after_update`
1486/// to keep FTS5 in sync. Also re-embeds the memory for recall accuracy.
1487#[allow(clippy::too_many_arguments)]
1488fn persist_enriched_body(
1489    conn: &Connection,
1490    namespace: &str,
1491    memory_id: i64,
1492    memory_name: &str,
1493    new_body: &str,
1494    paths: &crate::paths::AppPaths,
1495    llm_backend: crate::cli::LlmBackendChoice,
1496    embedding_backend: crate::cli::EmbeddingBackendChoice,
1497) -> Result<(), AppError> {
1498    // Read current values for FTS sync
1499    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1500        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1501        rusqlite::params![memory_id],
1502        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1503    )?;
1504
1505    let memory_type: String = conn.query_row(
1506        "SELECT type FROM memories WHERE id=?1",
1507        rusqlite::params![memory_id],
1508        |r| r.get(0),
1509    )?;
1510
1511    let description: String = conn.query_row(
1512        "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1513        rusqlite::params![memory_id],
1514        |r| r.get(0),
1515    )?;
1516
1517    let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1518
1519    let new_memory = memories::NewMemory {
1520        namespace: namespace.to_string(),
1521        name: memory_name.to_string(),
1522        memory_type: memory_type.clone(),
1523        description: description.clone(),
1524        body: new_body.to_string(),
1525        body_hash,
1526        session_id: None,
1527        source: "agent".to_string(),
1528        metadata: serde_json::json!({
1529            "operation": "body-enrich",
1530            "orig_chars": old_body.chars().count(),
1531            "new_chars": new_body.chars().count(),
1532        }),
1533    };
1534
1535    // G29 audit: insert a new immutable version BEFORE the update so the
1536    // enriched body is reachable through `history --name <X>` and
1537    // `restore --version N` can roll back to the pre-enrich state.
1538    let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1539    let version_metadata = serde_json::json!({
1540        "operation": "body-enrich",
1541        "orig_chars": old_body.chars().count(),
1542        "new_chars": new_body.chars().count(),
1543    })
1544    .to_string();
1545    crate::storage::versions::insert_version(
1546        conn,
1547        memory_id,
1548        next_version,
1549        memory_name,
1550        &memory_type,
1551        &description,
1552        new_body,
1553        &version_metadata,
1554        Some("enrich"),
1555        "edit",
1556    )?;
1557
1558    memories::update(conn, memory_id, &new_memory, None)?;
1559    memories::sync_fts_after_update(
1560        conn,
1561        memory_id,
1562        &old_name,
1563        &old_desc,
1564        &old_body,
1565        &new_memory.name,
1566        &new_memory.description,
1567        &new_memory.body,
1568    )?;
1569
1570    // Re-embed for recall accuracy
1571    if let Err(e) = reembed_memory_vector(
1572        conn,
1573        namespace,
1574        memory_id,
1575        memory_name,
1576        &memory_type,
1577        new_body,
1578        paths,
1579        llm_backend,
1580        embedding_backend,
1581    ) {
1582        tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1583    }
1584
1585    Ok(())
1586}
1587
1588// ---------------------------------------------------------------------------
1589// Main entry point
1590// ---------------------------------------------------------------------------
1591
1592// ---------------------------------------------------------------------------
1593// G20: mode-conditional flag validation
1594// ---------------------------------------------------------------------------
1595
1596/// True when a scalar value matches its declared default. Used to
1597/// distinguish "operator passed an explicit override" from "clap filled
1598/// the default" for flags with default_value_t.
1599fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1600    value == default
1601}
1602
1603/// G20: validate that flags for one LLM provider were not passed when
1604/// the operator selected a different provider. Flags silently discarded
1605/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1606/// DB work, so the operator gets an actionable error instead of a
1607/// surprise at runtime.
1608///
1609/// Detection rules:
1610/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1611/// - For scalar fields with default_value_t: value != default means explicit
1612/// - For boolean fields: true means explicit (default is false)
1613///
1614/// Mode-specific matrices:
1615/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1616/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1617fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1618    const DEFAULT_TIMEOUT: u64 = 300;
1619
1620    let mut conflicts: Vec<String> = Vec::new();
1621
1622    match args.mode {
1623        EnrichMode::ClaudeCode => {
1624            if args.codex_binary.is_some() {
1625                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1626            }
1627            if args.codex_model.is_some() {
1628                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1629            }
1630            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1631                conflicts.push(format!(
1632                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1633                    args.codex_timeout
1634                ));
1635            }
1636        }
1637        EnrichMode::Codex => {
1638            if args.claude_binary.is_some() {
1639                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1640            }
1641            if args.claude_model.is_some() {
1642                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1643            }
1644            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1645                conflicts.push(format!(
1646                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1647                    args.claude_timeout
1648                ));
1649            }
1650            if args.max_cost_usd.is_some() {
1651                conflicts.push(
1652                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1653                        .to_string(),
1654                );
1655            }
1656        }
1657        EnrichMode::Opencode => {
1658            if args.claude_binary.is_some() {
1659                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1660            }
1661            if args.claude_model.is_some() {
1662                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1663            }
1664            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1665                conflicts.push(format!(
1666                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1667                    args.claude_timeout
1668                ));
1669            }
1670            if args.max_cost_usd.is_some() {
1671                conflicts.push(
1672                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1673                        .to_string(),
1674                );
1675            }
1676        }
1677    }
1678
1679    if !conflicts.is_empty() {
1680        return Err(AppError::Validation(format!(
1681            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1682            args.mode,
1683            conflicts.join("\n  - ")
1684        )));
1685    }
1686
1687    Ok(())
1688}
1689
1690// ---------------------------------------------------------------------------
1691
1692/// Main entry point for the `enrich` command.
1693pub fn run(
1694    args: &EnrichArgs,
1695    llm_backend: crate::cli::LlmBackendChoice,
1696    embedding_backend: crate::cli::EmbeddingBackendChoice,
1697) -> Result<(), AppError> {
1698    // G20: mode-conditional flag validation BEFORE any DB access.
1699    // Surfaces flags that the wrong mode would silently discard.
1700    validate_mode_conditional_flags_enrich(args)?;
1701    let started = Instant::now();
1702
1703    let paths = AppPaths::resolve(args.db.as_deref())?;
1704    ensure_db_ready(&paths)?;
1705    let conn = open_rw(&paths.db)?;
1706    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1707
1708    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1709    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1710    // on the same DB cannot co-exist, but concurrent enrich on different
1711    // databases works as expected. The force flag (--force) breaks a
1712    // stale lock from a previously crashed invocation.
1713    let wait_secs = args.wait_job_singleton;
1714    let force_flag = args.force_job_singleton;
1715    let _singleton = crate::lock::acquire_job_singleton(
1716        crate::lock::JobType::Enrich,
1717        &namespace,
1718        &paths.db,
1719        wait_secs,
1720        force_flag,
1721    )?;
1722
1723    // Validate provider binary upfront only for LLM-backed operations.
1724    let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1725        None
1726    } else {
1727        Some(match args.mode {
1728            EnrichMode::ClaudeCode => {
1729                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1730                let version = super::claude_runner::validate_claude_version(&bin)?;
1731                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1732                emit_json(&PhaseEvent {
1733                    phase: "validate",
1734                    binary_path: bin.to_str(),
1735                    version: Some(&version),
1736                    items_total: None,
1737                    items_pending: None,
1738                    llm_parallelism: None,
1739                });
1740                bin
1741            }
1742            EnrichMode::Codex => {
1743                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1744                emit_json(&PhaseEvent {
1745                    phase: "validate",
1746                    binary_path: bin.to_str(),
1747                    version: None,
1748                    items_total: None,
1749                    items_pending: None,
1750                    llm_parallelism: None,
1751                });
1752                bin
1753            }
1754            EnrichMode::Opencode => {
1755                let bin = super::opencode_runner::find_opencode_binary_with_override(
1756                    args.opencode_binary.as_deref(),
1757                )?;
1758                emit_json(&PhaseEvent {
1759                    phase: "validate",
1760                    binary_path: bin.to_str(),
1761                    version: None,
1762                    items_total: None,
1763                    items_pending: None,
1764                    llm_parallelism: None,
1765                });
1766                bin
1767            }
1768        })
1769    };
1770
1771    // G28-D: refuse to start when the system is saturated. This check
1772    // is BEFORE preflight so we never spend an OAuth turn on a host
1773    // that is already at the limit.
1774    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1775        let load = crate::system_load::load_average_one();
1776        let n = crate::system_load::ncpus();
1777        return Err(AppError::Validation(format!(
1778            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1779             pass --no-max-load-check to override (not recommended)"
1780        )));
1781    }
1782
1783    // G35: preflight probe — issue a single ping turn to verify the
1784    // provider is healthy before scanning N candidates. If the probe
1785    // fails with a rate-limit error, optionally fall back to a
1786    // different mode (typically codex) instead of failing the entire
1787    // batch. The probe itself consumes 1 OAuth turn, so it stays
1788    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1789    if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1790    {
1791        let preflight_result = run_preflight_probe(args);
1792        match preflight_result {
1793            PreflightOutcome::Healthy => {
1794                tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1795            }
1796            PreflightOutcome::RateLimited { reason, suggestion } => {
1797                if let Some(fallback) = args.fallback_mode.clone() {
1798                    if fallback != args.mode {
1799                        // G35 (v1.0.69): the mid-batch mode switch is
1800                        // intentionally NOT applied because it would
1801                        // desynchronise the per-item rate-limit wait
1802                        // state (rate-limited items in the worker are
1803                        // timed against the original provider). Instead
1804                        // we abort cleanly so the operator can re-invoke
1805                        // with `--mode {fallback:?}`. This guarantees no
1806                        // OAuth window is wasted and no partial state
1807                        // is left in the queue.
1808                        return Err(AppError::Validation(format!(
1809                            "preflight detected rate limit on {mode:?}: {reason}; \
1810                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1811                            mode = args.mode
1812                        )));
1813                    }
1814                    return Err(AppError::Validation(format!(
1815                        "preflight detected rate limit on {mode:?}: {reason}; \
1816                         --fallback-mode matches --mode, no recovery possible",
1817                        mode = args.mode
1818                    )));
1819                }
1820                return Err(AppError::Validation(format!(
1821                    "preflight detected rate limit on {mode:?}: {reason}; \
1822                     {suggestion}; pass --fallback-mode codex to recover",
1823                    mode = args.mode
1824                )));
1825            }
1826            PreflightOutcome::Error(e) => {
1827                return Err(e);
1828            }
1829        }
1830    }
1831
1832    // SCAN phase
1833    let scan_result = scan_operation(&conn, &namespace, args)?;
1834    let total = scan_result.len();
1835
1836    emit_json(&PhaseEvent {
1837        phase: "scan",
1838        binary_path: None,
1839        version: None,
1840        items_total: Some(total),
1841        items_pending: Some(total),
1842        llm_parallelism: Some(args.llm_parallelism),
1843    });
1844
1845    // Dry-run: emit preview events and summary without calling LLM
1846    if args.dry_run {
1847        for (idx, key) in scan_result.iter().enumerate() {
1848            emit_json(&ItemEvent {
1849                item: key,
1850                status: "preview",
1851                memory_id: None,
1852                entity_id: None,
1853                entities: None,
1854                rels: None,
1855                chars_before: None,
1856                chars_after: None,
1857                cost_usd: None,
1858                elapsed_ms: None,
1859                error: None,
1860                index: idx,
1861                total,
1862            });
1863        }
1864        emit_json(&EnrichSummary {
1865            summary: true,
1866            operation: format!("{:?}", args.operation),
1867            items_total: total,
1868            completed: 0,
1869            failed: 0,
1870            skipped: 0,
1871            cost_usd: 0.0,
1872            elapsed_ms: started.elapsed().as_millis() as u64,
1873            backend_invoked: take_enrich_backend(),
1874        });
1875        return Ok(());
1876    }
1877
1878    // All operations in this enum have an execution path.
1879
1880    // Queue setup for resume/retry
1881    let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1882
1883    if args.resume {
1884        let reset = queue_conn
1885            .execute(
1886                "UPDATE queue SET status='pending' WHERE status='processing'",
1887                [],
1888            )
1889            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1890        if reset > 0 {
1891            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1892        }
1893    }
1894
1895    if args.retry_failed {
1896        let count = queue_conn
1897            .execute(
1898                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1899                [],
1900            )
1901            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1902        tracing::info!(target: "enrich", count, "retrying failed items");
1903    }
1904
1905    if !args.resume && !args.retry_failed {
1906        queue_conn
1907            .execute("DELETE FROM queue", [])
1908            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1909    }
1910
1911    // Populate queue
1912    for (idx, key) in scan_result.iter().enumerate() {
1913        let item_type = match args.operation {
1914            EnrichOperation::EntityDescriptions => "entity",
1915            _ => "memory",
1916        };
1917        if let Err(e) = queue_conn.execute(
1918            "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1919            rusqlite::params![key, item_type],
1920        ) {
1921            tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1922        }
1923        let _ = idx; // suppress unused warning
1924    }
1925
1926    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1927    // Clamp enforces the range even if the caller bypasses clap validation.
1928    let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1929    if parallelism > 1 {
1930        tracing::info!(
1931            target: "enrich",
1932            llm_parallelism = parallelism,
1933            "parallel LLM processing with bounded thread pool"
1934        );
1935    }
1936    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1937    // ceiling. The threshold and message depend on the LLM mode because
1938    // Claude Code spawns MCP children (G28-A) while Codex does not.
1939    if parallelism > 4 {
1940        match args.mode {
1941            EnrichMode::ClaudeCode => {
1942                tracing::warn!(
1943                    target: "enrich",
1944                    llm_parallelism = parallelism,
1945                    recommended_max = 4,
1946                    mode = "claude-code",
1947                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1948                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1949                     to cut MCP children (G28-A)"
1950                );
1951            }
1952            EnrichMode::Codex if parallelism > 16 => {
1953                tracing::warn!(
1954                    target: "enrich",
1955                    llm_parallelism = parallelism,
1956                    recommended_max = 16,
1957                    mode = "codex",
1958                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1959                     consider --llm-parallelism 8 for safer concurrency"
1960                );
1961            }
1962            EnrichMode::Codex => {
1963                // No warning: codex does not spawn MCP children and was
1964                // validated at parallelism 8 in production (1161 items,
1965                // 0 failures) per the 2026-06-04 session audit.
1966            }
1967            EnrichMode::Opencode if parallelism > 16 => {
1968                tracing::warn!(
1969                    target: "enrich",
1970                    llm_parallelism = parallelism,
1971                    recommended_max = 16,
1972                    mode = "opencode",
1973                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1974                     consider --llm-parallelism 8 for safer concurrency"
1975                );
1976            }
1977            EnrichMode::Opencode => {
1978                // No warning: opencode does not spawn MCP children.
1979            }
1980        }
1981    }
1982
1983    let mut completed = 0usize;
1984    let mut failed = 0usize;
1985    let mut skipped = 0usize;
1986    let mut cost_total = 0.0f64;
1987    let mut oauth_detected = false;
1988    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1989    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1990    let enrich_started = std::time::Instant::now();
1991
1992    let provider_timeout = match args.mode {
1993        EnrichMode::ClaudeCode => args.claude_timeout,
1994        EnrichMode::Codex => args.codex_timeout,
1995        EnrichMode::Opencode => args.opencode_timeout,
1996    };
1997
1998    let provider_model: Option<&str> = match args.mode {
1999        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
2000        EnrichMode::Codex => args.codex_model.as_deref(),
2001        EnrichMode::Opencode => args.opencode_model.as_deref(),
2002    };
2003
2004    // G19: when parallelism > 1, spawn bounded worker threads.
2005    // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
2006    // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
2007    if parallelism > 1 {
2008        let stdout_mu = parking_lot::Mutex::new(());
2009        let budget = args.max_cost_usd;
2010        let operation = args.operation.clone();
2011        let mode = args.mode.clone();
2012        let min_oc = args.min_output_chars;
2013        let max_oc = args.max_output_chars;
2014        let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2015
2016        struct WorkerResult {
2017            completed: usize,
2018            failed: usize,
2019            skipped: usize,
2020            cost: f64,
2021            oauth: bool,
2022        }
2023
2024        let results: Vec<WorkerResult> = std::thread::scope(|s| {
2025            let handles: Vec<_> = (0..parallelism)
2026                .map(|worker_id| {
2027                    let stdout_mu = &stdout_mu;
2028                    let paths = &paths;
2029                    let namespace = &namespace;
2030                    let provider_binary = provider_binary.as_deref();
2031                    let operation = &operation;
2032                    let mode = &mode;
2033                    let prompt_tpl = prompt_tpl.as_deref();
2034                    s.spawn(move || {
2035                        let w_conn = match open_rw(&paths.db) {
2036                            Ok(c) => c,
2037                            Err(e) => {
2038                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2039                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2040                            }
2041                        };
2042                        let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2043                            Ok(c) => c,
2044                            Err(e) => {
2045                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2046                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2047                            }
2048                        };
2049                        let mut w_completed = 0usize;
2050                        let mut w_failed = 0usize;
2051                        let mut w_skipped = 0usize;
2052                        let mut w_cost = 0.0f64;
2053                        let mut w_oauth = false;
2054                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2055                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2056                        // G28-D: per-worker circuit breaker that aborts the
2057                        // loop after `circuit_breaker_threshold` consecutive
2058                        // HardFailure outcomes (transient/rate-limited errors
2059                        // do NOT count, so a recovering provider is not
2060                        // penalised).
2061                        let mut w_breaker = crate::retry::CircuitBreaker::new(
2062                            args.circuit_breaker_threshold.max(1),
2063                            std::time::Duration::from_secs(60),
2064                        );
2065
2066                        loop {
2067                            if crate::shutdown_requested() {
2068                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2069                                break;
2070                            }
2071                            if let Some(b) = budget {
2072                                if !w_oauth && w_cost >= b {
2073                                    break;
2074                                }
2075                            }
2076                            let pending: Option<(i64, String, String)> = w_queue
2077                                .query_row(
2078                                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2079                                     WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2080                                     RETURNING id, item_key, item_type",
2081                                    [],
2082                                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2083                                )
2084                                .ok();
2085                            let (queue_id, item_key, _item_type) = match pending {
2086                                Some(p) => p,
2087                                None => break,
2088                            };
2089                            let item_started = Instant::now();
2090                            let current_index = w_completed + w_failed + w_skipped;
2091
2092                            let call_result = match operation {
2093                                EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2094                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2095                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2096                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2097                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2098                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2099                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2100                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2101                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2102                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2103                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2104                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2105                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2106                            };
2107
2108                            match call_result {
2109                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2110                                    if is_oauth { w_oauth = true; }
2111                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2112                                    let _ = w_queue.execute(
2113                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2114                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2115                                    );
2116                                    w_completed += 1;
2117                                    if !is_oauth { w_cost += cost; }
2118                                    // G28-D: count success; resets breaker.
2119                                    let _ = w_breaker
2120                                        .record(crate::retry::AttemptOutcome::Success);
2121                                    let _guard = stdout_mu.lock();
2122                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2123                                }
2124                                Ok(EnrichItemResult::Skipped { reason }) => {
2125                                    w_skipped += 1;
2126                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2127                                    let _guard = stdout_mu.lock();
2128                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2129                                }
2130                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2131                                    // G29 Passo 4: worker mirror of the
2132                                    // serial path. Counted as a soft
2133                                    // skip so the queue surface shows
2134                                    // a quality issue rather than a
2135                                    // transport failure.
2136                                    w_skipped += 1;
2137                                    let reason = format!(
2138                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2139                                    );
2140                                    let _ = w_queue.execute(
2141                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2142                                        rusqlite::params![reason, queue_id],
2143                                    );
2144                                    let _guard = stdout_mu.lock();
2145                                    emit_json(&ItemEvent {
2146                                        item: &item_key,
2147                                        status: "preservation_failed",
2148                                        memory_id: None,
2149                                        entity_id: None,
2150                                        entities: None,
2151                                        rels: None,
2152                                        chars_before: Some(chars_before),
2153                                        chars_after: Some(chars_after),
2154                                        cost_usd: None,
2155                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2156                                        error: Some(reason),
2157                                        index: current_index,
2158                                        total,
2159                                    });
2160                                }
2161                                Err(e) => {
2162                                    let err_str = format!("{e}");
2163                                    if matches!(e, AppError::RateLimited { .. }) {
2164                                        if crate::retry::is_kill_switch_active() {
2165                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2166                                        } else if std::time::Instant::now() >= w_deadline {
2167                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2168                                        } else {
2169                                            let half = w_backoff / 2;
2170                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2171                                            let actual_wait = half + jitter;
2172                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2173                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2174                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2175                                            w_backoff = (w_backoff * 2).min(900);
2176                                            continue;
2177                                        }
2178                                    }
2179                                    w_failed += 1;
2180                                    let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2181                                    let _guard = stdout_mu.lock();
2182                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2183                                    // G28-D: count hard failure against breaker.
2184                                    let breaker_opened = w_breaker
2185                                        .record(crate::retry::AttemptOutcome::HardFailure);
2186                                    if breaker_opened {
2187                                        tracing::error!(target: "enrich",
2188                                            consecutive_failures = w_breaker.consecutive_failures(),
2189                                            "circuit breaker opened — aborting worker"
2190                                        );
2191                                        break;
2192                                    }
2193                                }
2194                            }
2195                        }
2196                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2197                    })
2198                })
2199                .collect();
2200            handles
2201                .into_iter()
2202                .map(|h| {
2203                    h.join().unwrap_or(WorkerResult {
2204                        completed: 0,
2205                        failed: 0,
2206                        skipped: 0,
2207                        cost: 0.0,
2208                        oauth: false,
2209                    })
2210                })
2211                .collect()
2212        });
2213
2214        for r in &results {
2215            completed += r.completed;
2216            failed += r.failed;
2217            skipped += r.skipped;
2218            cost_total += r.cost;
2219            if r.oauth && !oauth_detected {
2220                oauth_detected = true;
2221            }
2222        }
2223    } else {
2224        // Serial path (parallelism == 1) — original loop
2225        loop {
2226            if crate::shutdown_requested() {
2227                tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2228                break;
2229            }
2230
2231            // Budget check
2232            if let Some(budget) = args.max_cost_usd {
2233                if !oauth_detected && cost_total >= budget {
2234                    tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2235                    break;
2236                }
2237            }
2238
2239            // Dequeue next pending item
2240            let pending: Option<(i64, String, String)> = queue_conn
2241                .query_row(
2242                    "UPDATE queue SET status='processing', attempt=attempt+1 \
2243                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2244                 RETURNING id, item_key, item_type",
2245                    [],
2246                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2247                )
2248                .ok();
2249
2250            let (queue_id, item_key, item_type) = match pending {
2251                Some(p) => p,
2252                None => break,
2253            };
2254
2255            let item_started = Instant::now();
2256            let current_index = completed + failed + skipped;
2257
2258            let call_result = match args.operation {
2259                EnrichOperation::MemoryBindings => call_memory_bindings(
2260                    &conn,
2261                    &namespace,
2262                    &item_key,
2263                    provider_binary
2264                        .as_deref()
2265                        .expect("provider binary required"),
2266                    provider_model,
2267                    provider_timeout,
2268                    &args.mode,
2269                ),
2270                EnrichOperation::EntityDescriptions => call_entity_description(
2271                    &conn,
2272                    &namespace,
2273                    &item_key,
2274                    provider_binary
2275                        .as_deref()
2276                        .expect("provider binary required"),
2277                    provider_model,
2278                    provider_timeout,
2279                    &args.mode,
2280                ),
2281                EnrichOperation::BodyEnrich => call_body_enrich(
2282                    &conn,
2283                    &namespace,
2284                    &item_key,
2285                    provider_binary
2286                        .as_deref()
2287                        .expect("provider binary required"),
2288                    provider_model,
2289                    provider_timeout,
2290                    &args.mode,
2291                    args.min_output_chars,
2292                    args.max_output_chars,
2293                    args.prompt_template.as_deref(),
2294                    args.preserve_threshold,
2295                    &paths,
2296                    llm_backend,
2297                    embedding_backend,
2298                ),
2299                EnrichOperation::ReEmbed => call_reembed(
2300                    &conn,
2301                    &namespace,
2302                    &item_key,
2303                    &paths,
2304                    llm_backend,
2305                    embedding_backend,
2306                ),
2307                EnrichOperation::WeightCalibrate => call_weight_calibrate(
2308                    &conn,
2309                    &namespace,
2310                    &item_key,
2311                    provider_binary
2312                        .as_deref()
2313                        .expect("provider binary required"),
2314                    provider_model,
2315                    provider_timeout,
2316                    &args.mode,
2317                ),
2318                EnrichOperation::RelationReclassify => call_relation_reclassify(
2319                    &conn,
2320                    &namespace,
2321                    &item_key,
2322                    provider_binary
2323                        .as_deref()
2324                        .expect("provider binary required"),
2325                    provider_model,
2326                    provider_timeout,
2327                    &args.mode,
2328                ),
2329                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2330                    call_entity_connect(
2331                        &conn,
2332                        &namespace,
2333                        &item_key,
2334                        provider_binary
2335                            .as_deref()
2336                            .expect("provider binary required"),
2337                        provider_model,
2338                        provider_timeout,
2339                        &args.mode,
2340                    )
2341                }
2342                EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2343                    &conn,
2344                    &namespace,
2345                    &item_key,
2346                    provider_binary
2347                        .as_deref()
2348                        .expect("provider binary required"),
2349                    provider_model,
2350                    provider_timeout,
2351                    &args.mode,
2352                ),
2353                EnrichOperation::DescriptionEnrich => call_description_enrich(
2354                    &conn,
2355                    &namespace,
2356                    &item_key,
2357                    provider_binary
2358                        .as_deref()
2359                        .expect("provider binary required"),
2360                    provider_model,
2361                    provider_timeout,
2362                    &args.mode,
2363                ),
2364                EnrichOperation::DomainClassify => call_domain_classify(
2365                    &conn,
2366                    &namespace,
2367                    &item_key,
2368                    provider_binary
2369                        .as_deref()
2370                        .expect("provider binary required"),
2371                    provider_model,
2372                    provider_timeout,
2373                    &args.mode,
2374                ),
2375                EnrichOperation::GraphAudit => call_graph_audit(
2376                    &conn,
2377                    &namespace,
2378                    &item_key,
2379                    provider_binary
2380                        .as_deref()
2381                        .expect("provider binary required"),
2382                    provider_model,
2383                    provider_timeout,
2384                    &args.mode,
2385                ),
2386                EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2387                    &conn,
2388                    &namespace,
2389                    &item_key,
2390                    provider_binary
2391                        .as_deref()
2392                        .expect("provider binary required"),
2393                    provider_model,
2394                    provider_timeout,
2395                    &args.mode,
2396                ),
2397                EnrichOperation::BodyExtract => call_body_extract(
2398                    &conn,
2399                    &namespace,
2400                    &item_key,
2401                    provider_binary
2402                        .as_deref()
2403                        .expect("provider binary required"),
2404                    provider_model,
2405                    provider_timeout,
2406                    &args.mode,
2407                ),
2408            };
2409
2410            match call_result {
2411                Ok(EnrichItemResult::Done {
2412                    memory_id,
2413                    entity_id,
2414                    entities,
2415                    rels,
2416                    chars_before,
2417                    chars_after,
2418                    cost,
2419                    is_oauth,
2420                }) => {
2421                    if is_oauth && !oauth_detected {
2422                        oauth_detected = true;
2423                        tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2424                    }
2425                    backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2426
2427                    // Persist depends on the operation
2428                    let persist_err: Option<String> = match args.operation {
2429                        EnrichOperation::MemoryBindings => {
2430                            // Bindings already persisted inside call_memory_bindings
2431                            None
2432                        }
2433                        EnrichOperation::EntityDescriptions => {
2434                            // Description already persisted inside call_entity_description
2435                            None
2436                        }
2437                        EnrichOperation::BodyEnrich => {
2438                            // Body already persisted inside call_body_enrich
2439                            None
2440                        }
2441                        _ => {
2442                            // All G27 operations persist inside their call_* function
2443                            None
2444                        }
2445                    };
2446
2447                    if let Err(e) = queue_conn.execute(
2448                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2449                    rusqlite::params![
2450                        memory_id,
2451                        entity_id,
2452                        entities as i64,
2453                        rels as i64,
2454                        cost,
2455                        item_started.elapsed().as_millis() as i64,
2456                        queue_id
2457                    ],
2458                ) {
2459                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2460                    }
2461
2462                    if persist_err.is_none() {
2463                        completed += 1;
2464                        if !is_oauth {
2465                            cost_total += cost;
2466                        }
2467                        emit_json(&ItemEvent {
2468                            item: &item_key,
2469                            status: "done",
2470                            memory_id,
2471                            entity_id,
2472                            entities: Some(entities),
2473                            rels: Some(rels),
2474                            chars_before,
2475                            chars_after,
2476                            cost_usd: if is_oauth { None } else { Some(cost) },
2477                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2478                            error: None,
2479                            index: current_index,
2480                            total,
2481                        });
2482                    } else {
2483                        failed += 1;
2484                        emit_json(&ItemEvent {
2485                            item: &item_key,
2486                            status: "failed",
2487                            memory_id: None,
2488                            entity_id: None,
2489                            entities: None,
2490                            rels: None,
2491                            chars_before: None,
2492                            chars_after: None,
2493                            cost_usd: None,
2494                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2495                            error: persist_err,
2496                            index: current_index,
2497                            total,
2498                        });
2499                    }
2500                }
2501                Ok(EnrichItemResult::Skipped { reason }) => {
2502                    skipped += 1;
2503                    if let Err(e) = queue_conn.execute(
2504                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2505                    rusqlite::params![reason, queue_id],
2506                ) {
2507                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2508                    }
2509                    emit_json(&ItemEvent {
2510                        item: &item_key,
2511                        status: "skipped",
2512                        memory_id: None,
2513                        entity_id: None,
2514                        entities: None,
2515                        rels: None,
2516                        chars_before: None,
2517                        chars_after: None,
2518                        cost_usd: None,
2519                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2520                        error: None,
2521                        index: current_index,
2522                        total,
2523                    });
2524                }
2525                Ok(EnrichItemResult::PreservationFailed {
2526                    score,
2527                    threshold,
2528                    chars_before,
2529                    chars_after,
2530                }) => {
2531                    // G29 Passo 4: the LLM rewrite diverged too far from
2532                    // the original body. Count as a soft failure (not
2533                    // `failed`) so the queue surfaces it as a quality
2534                    // issue, not a transport error. The reason is
2535                    // structured so the operator can audit why a body
2536                    // was rejected.
2537                    skipped += 1;
2538                    let reason = format!(
2539                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2540                    );
2541                    if let Err(qe) = queue_conn.execute(
2542                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2543                        rusqlite::params![reason, queue_id],
2544                    ) {
2545                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2546                    }
2547                    emit_json(&ItemEvent {
2548                        item: &item_key,
2549                        status: "preservation_failed",
2550                        memory_id: None,
2551                        entity_id: None,
2552                        entities: None,
2553                        rels: None,
2554                        chars_before: Some(chars_before),
2555                        chars_after: Some(chars_after),
2556                        cost_usd: None,
2557                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2558                        error: Some(reason),
2559                        index: current_index,
2560                        total,
2561                    });
2562                }
2563                Err(e) => {
2564                    let err_str = format!("{e}");
2565                    if matches!(e, AppError::RateLimited { .. }) {
2566                        if crate::retry::is_kill_switch_active() {
2567                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2568                        } else if std::time::Instant::now() >= rate_limit_deadline {
2569                            tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2570                        } else {
2571                            let half = backoff_secs / 2;
2572                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2573                            let actual_wait = half + jitter;
2574                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2575                            if let Err(qe) = queue_conn.execute(
2576                                "UPDATE queue SET status='pending' WHERE id=?1",
2577                                rusqlite::params![queue_id],
2578                            ) {
2579                                tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2580                            }
2581                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2582                            backoff_secs = (backoff_secs * 2).min(900);
2583                            continue;
2584                        }
2585                    }
2586
2587                    failed += 1;
2588                    if let Err(qe) = queue_conn.execute(
2589                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2590                    rusqlite::params![err_str, queue_id],
2591                ) {
2592                        tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2593                    }
2594                    emit_json(&ItemEvent {
2595                        item: &item_key,
2596                        status: "failed",
2597                        memory_id: None,
2598                        entity_id: None,
2599                        entities: None,
2600                        rels: None,
2601                        chars_before: None,
2602                        chars_after: None,
2603                        cost_usd: None,
2604                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2605                        error: Some(err_str),
2606                        index: current_index,
2607                        total,
2608                    });
2609                }
2610            }
2611
2612            let _ = item_type; // used via queue schema only
2613        }
2614    } // end else (serial path)
2615
2616    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2617    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2618
2619    emit_json(&EnrichSummary {
2620        summary: true,
2621        operation: format!("{:?}", args.operation),
2622        items_total: total,
2623        completed,
2624        failed,
2625        skipped,
2626        cost_usd: cost_total,
2627        elapsed_ms: started.elapsed().as_millis() as u64,
2628        backend_invoked: take_enrich_backend(),
2629    });
2630
2631    if failed == 0 {
2632        let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2633    }
2634
2635    Ok(())
2636}
2637
2638// ---------------------------------------------------------------------------
2639// Internal result type for a single item call
2640// ---------------------------------------------------------------------------
2641
2642enum EnrichItemResult {
2643    Done {
2644        memory_id: Option<i64>,
2645        entity_id: Option<i64>,
2646        entities: usize,
2647        rels: usize,
2648        chars_before: Option<usize>,
2649        chars_after: Option<usize>,
2650        cost: f64,
2651        is_oauth: bool,
2652    },
2653    Skipped {
2654        reason: String,
2655    },
2656    /// G29 Passo 4 (v1.0.69): the LLM rewrite diverged from the original
2657    /// body beyond the configured `--preserve-threshold` and was rejected
2658    /// before persistence. The trigram-Jaccard score and threshold are
2659    /// emitted in the NDJSON stream for operator audit.
2660    PreservationFailed {
2661        score: f64,
2662        threshold: f64,
2663        chars_before: usize,
2664        chars_after: usize,
2665    },
2666}
2667
2668// ---------------------------------------------------------------------------
2669// Per-operation call helpers (SCAN + JUDGE + PERSIST in one unit)
2670// ---------------------------------------------------------------------------
2671
2672fn call_memory_bindings(
2673    conn: &Connection,
2674    namespace: &str,
2675    memory_name: &str,
2676    binary: &Path,
2677    model: Option<&str>,
2678    timeout: u64,
2679    mode: &EnrichMode,
2680) -> Result<EnrichItemResult, AppError> {
2681    // Look up the memory
2682    let (memory_id, body): (i64, String) = conn.query_row(
2683        "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2684        rusqlite::params![namespace, memory_name],
2685        |r| Ok((r.get(0)?, r.get(1)?)),
2686    ).map_err(|e| match e {
2687        rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2688        other => AppError::Database(other),
2689    })?;
2690
2691    if body.trim().is_empty() {
2692        return Ok(EnrichItemResult::Skipped {
2693            reason: "body is empty".to_string(),
2694        });
2695    }
2696
2697    let (value, cost, is_oauth) = match mode {
2698        EnrichMode::ClaudeCode => call_claude(
2699            binary,
2700            BINDINGS_PROMPT,
2701            BINDINGS_SCHEMA,
2702            &body,
2703            model,
2704            timeout,
2705        )?,
2706        EnrichMode::Codex => call_codex(
2707            binary,
2708            BINDINGS_PROMPT,
2709            BINDINGS_SCHEMA,
2710            &body,
2711            model,
2712            timeout,
2713        )?,
2714        EnrichMode::Opencode => call_opencode(
2715            binary,
2716            BINDINGS_PROMPT,
2717            BINDINGS_SCHEMA,
2718            &body,
2719            model,
2720            timeout,
2721        )?,
2722    };
2723
2724    let empty_arr = serde_json::Value::Array(vec![]);
2725    let entities_val = value.get("entities").unwrap_or(&empty_arr);
2726    let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2727
2728    let (ent_count, rel_count) =
2729        persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2730
2731    Ok(EnrichItemResult::Done {
2732        memory_id: Some(memory_id),
2733        entity_id: None,
2734        entities: ent_count,
2735        rels: rel_count,
2736        chars_before: None,
2737        chars_after: None,
2738        cost,
2739        is_oauth,
2740    })
2741}
2742
2743fn call_entity_description(
2744    conn: &Connection,
2745    namespace: &str,
2746    entity_name: &str,
2747    binary: &Path,
2748    model: Option<&str>,
2749    timeout: u64,
2750    mode: &EnrichMode,
2751) -> Result<EnrichItemResult, AppError> {
2752    let (entity_id, entity_type): (i64, String) = conn
2753        .query_row(
2754            "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2755            rusqlite::params![namespace, entity_name],
2756            |r| Ok((r.get(0)?, r.get(1)?)),
2757        )
2758        .map_err(|e| match e {
2759            rusqlite::Error::QueryReturnedNoRows => {
2760                AppError::NotFound(format!("entity '{entity_name}' not found"))
2761            }
2762            other => AppError::Database(other),
2763        })?;
2764
2765    let prompt = format!(
2766        "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2767    );
2768
2769    let (value, cost, is_oauth) = match mode {
2770        EnrichMode::ClaudeCode => call_claude(
2771            binary,
2772            &prompt,
2773            ENTITY_DESCRIPTION_SCHEMA,
2774            "",
2775            model,
2776            timeout,
2777        )?,
2778        EnrichMode::Codex => call_codex(
2779            binary,
2780            &prompt,
2781            ENTITY_DESCRIPTION_SCHEMA,
2782            "",
2783            model,
2784            timeout,
2785        )?,
2786        EnrichMode::Opencode => call_opencode(
2787            binary,
2788            &prompt,
2789            ENTITY_DESCRIPTION_SCHEMA,
2790            "",
2791            model,
2792            timeout,
2793        )?,
2794    };
2795
2796    let description = value
2797        .get("description")
2798        .and_then(|v| v.as_str())
2799        .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2800
2801    persist_entity_description(conn, entity_id, description)?;
2802
2803    Ok(EnrichItemResult::Done {
2804        memory_id: None,
2805        entity_id: Some(entity_id),
2806        entities: 0,
2807        rels: 0,
2808        chars_before: None,
2809        chars_after: None,
2810        cost,
2811        is_oauth,
2812    })
2813}
2814
2815#[allow(clippy::too_many_arguments)]
2816fn call_body_enrich(
2817    conn: &Connection,
2818    namespace: &str,
2819    memory_name: &str,
2820    binary: &Path,
2821    model: Option<&str>,
2822    timeout: u64,
2823    mode: &EnrichMode,
2824    min_output_chars: usize,
2825    max_output_chars: usize,
2826    prompt_template: Option<&Path>,
2827    preserve_threshold: f64,
2828    paths: &crate::paths::AppPaths,
2829    llm_backend: crate::cli::LlmBackendChoice,
2830    embedding_backend: crate::cli::EmbeddingBackendChoice,
2831) -> Result<EnrichItemResult, AppError> {
2832    let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2833        .query_row(
2834            "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2835         FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2836            rusqlite::params![namespace, memory_name],
2837            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2838        )
2839        .map_err(|e| match e {
2840            rusqlite::Error::QueryReturnedNoRows => {
2841                AppError::NotFound(format!("memory '{memory_name}' not found"))
2842            }
2843            other => AppError::Database(other),
2844        })?;
2845
2846    let chars_before = body.chars().count();
2847
2848    // G26: gather graph context for contextualized enrichment
2849    let linked_entities: Vec<String> = {
2850        let mut stmt = conn.prepare_cached(
2851            "SELECT e.name FROM memory_entities me \
2852             JOIN entities e ON e.id = me.entity_id \
2853             WHERE me.memory_id = ?1 LIMIT 10",
2854        )?;
2855        let result: Vec<String> = stmt
2856            .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2857            .filter_map(|r| r.ok())
2858            .collect();
2859        drop(stmt);
2860        result
2861    };
2862
2863    // Load custom prompt template if provided
2864    let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2865        let file_size = std::fs::metadata(tmpl_path)
2866            .map_err(|e| {
2867                AppError::Io(std::io::Error::new(
2868                    e.kind(),
2869                    format!("failed to stat prompt template: {e}"),
2870                ))
2871            })?
2872            .len();
2873        if file_size > MAX_MEMORY_BODY_LEN as u64 {
2874            return Err(AppError::LimitExceeded(
2875                crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2876            ));
2877        }
2878        std::fs::read_to_string(tmpl_path).map_err(|e| {
2879            AppError::Io(std::io::Error::new(
2880                e.kind(),
2881                format!("failed to read prompt template: {e}"),
2882            ))
2883        })?
2884    } else {
2885        BODY_ENRICH_PROMPT_PREFIX.to_string()
2886    };
2887
2888    // G26: build contextualized prompt with graph data
2889    let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2890        let mut ctx = String::new();
2891        ctx.push_str(&format!(
2892            "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2893        ));
2894        if !description.is_empty() {
2895            ctx.push_str(&format!("- Description: {description}\n"));
2896        }
2897        ctx.push_str(&format!("- Domain: {namespace}\n"));
2898        if !linked_entities.is_empty() {
2899            ctx.push_str(&format!(
2900                "- Linked entities: {}\n",
2901                linked_entities.join(", ")
2902            ));
2903        }
2904        ctx
2905    } else {
2906        String::new()
2907    };
2908
2909    let prompt = format!(
2910        "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2911    );
2912
2913    // The body schema uses a free-form enriched_body field
2914    let (value, cost, is_oauth) = match mode {
2915        EnrichMode::ClaudeCode => {
2916            call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2917        }
2918        EnrichMode::Codex => {
2919            call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2920        }
2921        EnrichMode::Opencode => {
2922            call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2923        }
2924    };
2925
2926    let enriched_body = value
2927        .get("enriched_body")
2928        .and_then(|v| v.as_str())
2929        .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2930
2931    let chars_after = enriched_body.chars().count();
2932
2933    // G29 Passo 4 (v1.0.69): preservation check. Before persisting, run
2934    // a trigram-Jaccard similarity between the original body and the
2935    // LLM-rewritten body. When the score falls below
2936    // `args.preserve_threshold` (default 0.7 per the G29 gap), reject the
2937    // rewrite as a likely hallucination. The result is recorded in the
2938    // NDJSON stream so operators can audit what the LLM tried to do.
2939    let threshold = preserve_threshold;
2940    let verdict =
2941        crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2942    if !verdict.is_accepted() {
2943        return Ok(EnrichItemResult::PreservationFailed {
2944            score: match verdict {
2945                crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2946                crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2947                crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2948            },
2949            threshold,
2950            chars_before,
2951            chars_after,
2952        });
2953    }
2954
2955    // G29 Passo 5 (v1.0.69): idempotency via blake3 hash. Before persisting,
2956    // compare the hash of the original body against the hash of the enriched
2957    // body. Identical hashes mean the LLM produced a byte-for-byte identical
2958    // body (rare but possible) — treat as `Skipped` so re-running the batch
2959    // is safe and the queue does not get re-persisted entries.
2960    let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2961    let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2962    if old_hash == new_hash {
2963        return Ok(EnrichItemResult::Skipped {
2964            reason: format!(
2965                "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2966            ),
2967        });
2968    }
2969
2970    // Only persist if the enriched body is genuinely longer
2971    if chars_after <= chars_before {
2972        return Ok(EnrichItemResult::Skipped {
2973            reason: format!(
2974                "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2975            ),
2976        });
2977    }
2978
2979    persist_enriched_body(
2980        conn,
2981        namespace,
2982        memory_id,
2983        memory_name,
2984        enriched_body,
2985        paths,
2986        llm_backend,
2987        embedding_backend,
2988    )?;
2989
2990    Ok(EnrichItemResult::Done {
2991        memory_id: Some(memory_id),
2992        entity_id: None,
2993        entities: 0,
2994        rels: 0,
2995        chars_before: Some(chars_before),
2996        chars_after: Some(chars_after),
2997        cost,
2998        is_oauth,
2999    })
3000}
3001
3002fn call_reembed(
3003    conn: &Connection,
3004    namespace: &str,
3005    memory_name: &str,
3006    paths: &crate::paths::AppPaths,
3007    llm_backend: crate::cli::LlmBackendChoice,
3008    embedding_backend: crate::cli::EmbeddingBackendChoice,
3009) -> Result<EnrichItemResult, AppError> {
3010    let (memory_id, body, memory_type): (i64, String, String) = conn
3011        .query_row(
3012            "SELECT id, COALESCE(body,''), COALESCE(type,'note')
3013             FROM memories
3014             WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3015            rusqlite::params![namespace, memory_name],
3016            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3017        )
3018        .map_err(|e| match e {
3019            rusqlite::Error::QueryReturnedNoRows => {
3020                AppError::NotFound(format!("memory '{memory_name}' not found"))
3021            }
3022            other => AppError::Database(other),
3023        })?;
3024
3025    if body.trim().is_empty() {
3026        return Ok(EnrichItemResult::Skipped {
3027            reason: "body is empty".to_string(),
3028        });
3029    }
3030
3031    reembed_memory_vector(
3032        conn,
3033        namespace,
3034        memory_id,
3035        memory_name,
3036        &memory_type,
3037        &body,
3038        paths,
3039        llm_backend,
3040        embedding_backend,
3041    )?;
3042
3043    Ok(EnrichItemResult::Done {
3044        memory_id: Some(memory_id),
3045        entity_id: None,
3046        entities: 0,
3047        rels: 0,
3048        chars_before: Some(body.chars().count()),
3049        chars_after: Some(body.chars().count()),
3050        cost: 0.0,
3051        is_oauth: true,
3052    })
3053}
3054
3055// ---------------------------------------------------------------------------
3056// Scan dispatcher — maps operation to scan query result (item keys)
3057// ---------------------------------------------------------------------------
3058
3059fn scan_operation(
3060    conn: &Connection,
3061    namespace: &str,
3062    args: &EnrichArgs,
3063) -> Result<Vec<String>, AppError> {
3064    // G37: resolve --names + --names-file once and apply to every scan path.
3065    let name_filter = resolve_name_filter(args)?;
3066    match args.operation {
3067        EnrichOperation::MemoryBindings => {
3068            let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3069            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3070        }
3071        EnrichOperation::EntityDescriptions => {
3072            let rows =
3073                scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3074            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3075        }
3076        EnrichOperation::BodyEnrich => {
3077            let rows = scan_short_body_memories(
3078                conn,
3079                namespace,
3080                args.min_output_chars,
3081                args.limit,
3082                &name_filter,
3083            )?;
3084            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3085        }
3086        EnrichOperation::ReEmbed => {
3087            let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3088            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3089        }
3090        EnrichOperation::WeightCalibrate => {
3091            let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3092            Ok(rows
3093                .into_iter()
3094                .map(|(id, _, _, _, _)| id.to_string())
3095                .collect())
3096        }
3097        EnrichOperation::RelationReclassify => {
3098            let rows = scan_generic_relations(conn, namespace, args.limit)?;
3099            Ok(rows
3100                .into_iter()
3101                .map(|(id, _, _, _)| id.to_string())
3102                .collect())
3103        }
3104        EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3105            let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3106            Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3107        }
3108        EnrichOperation::EntityTypeValidate => {
3109            let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3110            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3111        }
3112        EnrichOperation::DescriptionEnrich => {
3113            let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3114            Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3115        }
3116        EnrichOperation::DomainClassify
3117        | EnrichOperation::GraphAudit
3118        | EnrichOperation::DeepResearchSynth
3119        | EnrichOperation::BodyExtract => {
3120            let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3121            let sql = format!(
3122                "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3123            );
3124            let mut stmt = conn.prepare(&sql)?;
3125            let names = stmt
3126                .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3127                .collect::<Result<Vec<_>, _>>()?;
3128            Ok(names)
3129        }
3130    }
3131}
3132
3133// ---------------------------------------------------------------------------
3134// Codex stub provider
3135// ---------------------------------------------------------------------------
3136
3137/// Locates the Codex CLI binary.
3138fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3139    if let Some(p) = explicit {
3140        if p.exists() {
3141            return Ok(p.to_path_buf());
3142        }
3143        return Err(AppError::Validation(format!(
3144            "Codex binary not found at explicit path: {}",
3145            p.display()
3146        )));
3147    }
3148
3149    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3150        let p = PathBuf::from(&env_path);
3151        if p.exists() {
3152            return Ok(p);
3153        }
3154    }
3155
3156    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3157    if let Some(path_var) = std::env::var_os("PATH") {
3158        for dir in std::env::split_paths(&path_var) {
3159            let candidate = dir.join(name);
3160            if candidate.exists() {
3161                return Ok(crate::extract::llm_embedding::resolve_real_binary(
3162                    &candidate,
3163                ));
3164            }
3165        }
3166    }
3167
3168    Err(AppError::Validation(
3169        "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3170    ))
3171}
3172
3173/// G27: Calibrate weight of a single relationship via LLM.
3174fn call_weight_calibrate(
3175    conn: &Connection,
3176    _namespace: &str,
3177    item_key: &str,
3178    binary: &Path,
3179    model: Option<&str>,
3180    timeout: u64,
3181    mode: &EnrichMode,
3182) -> Result<EnrichItemResult, AppError> {
3183    let rel_id: i64 = item_key
3184        .parse()
3185        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3186    let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3187        .query_row(
3188            "SELECT e1.name, e2.name, r.relation, r.weight \
3189             FROM relationships r \
3190             JOIN entities e1 ON e1.id = r.source_id \
3191             JOIN entities e2 ON e2.id = r.target_id \
3192             WHERE r.id = ?1",
3193            rusqlite::params![rel_id],
3194            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3195        )
3196        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3197
3198    let input_text = format!(
3199        "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3200    );
3201    let (value, cost, is_oauth) = match mode {
3202        EnrichMode::ClaudeCode => call_claude(
3203            binary,
3204            WEIGHT_CALIBRATE_PROMPT,
3205            WEIGHT_CALIBRATE_SCHEMA,
3206            &input_text,
3207            model,
3208            timeout,
3209        )?,
3210        EnrichMode::Codex => call_codex(
3211            binary,
3212            WEIGHT_CALIBRATE_PROMPT,
3213            WEIGHT_CALIBRATE_SCHEMA,
3214            &input_text,
3215            model,
3216            timeout,
3217        )?,
3218        EnrichMode::Opencode => call_opencode(
3219            binary,
3220            WEIGHT_CALIBRATE_PROMPT,
3221            WEIGHT_CALIBRATE_SCHEMA,
3222            &input_text,
3223            model,
3224            timeout,
3225        )?,
3226    };
3227
3228    let calibrated = value
3229        .get("calibrated_weight")
3230        .and_then(|v| v.as_f64())
3231        .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3232
3233    conn.execute(
3234        "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3235        rusqlite::params![calibrated, rel_id],
3236    )?;
3237
3238    Ok(EnrichItemResult::Done {
3239        memory_id: None,
3240        entity_id: None,
3241        entities: 0,
3242        rels: 1,
3243        chars_before: None,
3244        chars_after: None,
3245        cost,
3246        is_oauth,
3247    })
3248}
3249
3250/// G27: Reclassify a generic relationship type via LLM.
3251fn call_relation_reclassify(
3252    conn: &Connection,
3253    _namespace: &str,
3254    item_key: &str,
3255    binary: &Path,
3256    model: Option<&str>,
3257    timeout: u64,
3258    mode: &EnrichMode,
3259) -> Result<EnrichItemResult, AppError> {
3260    let rel_id: i64 = item_key
3261        .parse()
3262        .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3263    let (source_name, target_name, current_relation): (String, String, String) = conn
3264        .query_row(
3265            "SELECT e1.name, e2.name, r.relation \
3266             FROM relationships r \
3267             JOIN entities e1 ON e1.id = r.source_id \
3268             JOIN entities e2 ON e2.id = r.target_id \
3269             WHERE r.id = ?1",
3270            rusqlite::params![rel_id],
3271            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3272        )
3273        .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3274
3275    let input_text = format!(
3276        "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3277    );
3278    let (value, cost, is_oauth) = match mode {
3279        EnrichMode::ClaudeCode => call_claude(
3280            binary,
3281            RELATION_RECLASSIFY_PROMPT,
3282            RELATION_RECLASSIFY_SCHEMA,
3283            &input_text,
3284            model,
3285            timeout,
3286        )?,
3287        EnrichMode::Codex => call_codex(
3288            binary,
3289            RELATION_RECLASSIFY_PROMPT,
3290            RELATION_RECLASSIFY_SCHEMA,
3291            &input_text,
3292            model,
3293            timeout,
3294        )?,
3295        EnrichMode::Opencode => call_opencode(
3296            binary,
3297            RELATION_RECLASSIFY_PROMPT,
3298            RELATION_RECLASSIFY_SCHEMA,
3299            &input_text,
3300            model,
3301            timeout,
3302        )?,
3303    };
3304
3305    let new_relation = value
3306        .get("relation")
3307        .and_then(|v| v.as_str())
3308        .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3309    let new_strength = value
3310        .get("strength")
3311        .and_then(|v| v.as_f64())
3312        .unwrap_or(0.5);
3313
3314    conn.execute(
3315        "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3316        rusqlite::params![new_relation, new_strength, rel_id],
3317    )?;
3318
3319    Ok(EnrichItemResult::Done {
3320        memory_id: None,
3321        entity_id: None,
3322        entities: 0,
3323        rels: 1,
3324        chars_before: None,
3325        chars_after: None,
3326        cost,
3327        is_oauth,
3328    })
3329}
3330
3331/// G27 P2: Connect isolated entities via LLM-suggested relationship.
3332fn call_entity_connect(
3333    conn: &Connection,
3334    namespace: &str,
3335    item_key: &str,
3336    binary: &Path,
3337    model: Option<&str>,
3338    timeout: u64,
3339    mode: &EnrichMode,
3340) -> Result<EnrichItemResult, AppError> {
3341    let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3342    let (e1_id, e1_name, e2_id, e2_name) =
3343        match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3344            Some(p) => p,
3345            None => {
3346                return Ok(EnrichItemResult::Skipped {
3347                    reason: "pair no longer isolated".into(),
3348                })
3349            }
3350        };
3351    let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3352    let (value, cost, is_oauth) = match mode {
3353        EnrichMode::ClaudeCode => call_claude(
3354            binary,
3355            ENTITY_CONNECT_PROMPT,
3356            ENTITY_CONNECT_SCHEMA,
3357            &input_text,
3358            model,
3359            timeout,
3360        )?,
3361        EnrichMode::Codex => call_codex(
3362            binary,
3363            ENTITY_CONNECT_PROMPT,
3364            ENTITY_CONNECT_SCHEMA,
3365            &input_text,
3366            model,
3367            timeout,
3368        )?,
3369        EnrichMode::Opencode => call_opencode(
3370            binary,
3371            ENTITY_CONNECT_PROMPT,
3372            ENTITY_CONNECT_SCHEMA,
3373            &input_text,
3374            model,
3375            timeout,
3376        )?,
3377    };
3378    let relation = value
3379        .get("relation")
3380        .and_then(|v| v.as_str())
3381        .unwrap_or("none");
3382    if relation == "none" {
3383        return Ok(EnrichItemResult::Skipped {
3384            reason: "LLM determined no relationship".into(),
3385        });
3386    }
3387    let strength = value
3388        .get("strength")
3389        .and_then(|v| v.as_f64())
3390        .unwrap_or(0.5);
3391    conn.execute(
3392        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3393        rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3394    )?;
3395    Ok(EnrichItemResult::Done {
3396        memory_id: None,
3397        entity_id: None,
3398        entities: 0,
3399        rels: 1,
3400        chars_before: None,
3401        chars_after: None,
3402        cost,
3403        is_oauth,
3404    })
3405}
3406
3407/// G27 P2: Validate entity type assignment via LLM.
3408fn call_entity_type_validate(
3409    conn: &Connection,
3410    _namespace: &str,
3411    item_key: &str,
3412    binary: &Path,
3413    model: Option<&str>,
3414    timeout: u64,
3415    mode: &EnrichMode,
3416) -> Result<EnrichItemResult, AppError> {
3417    let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3418        .query_row(
3419            "SELECT id, name, type FROM entities WHERE name = ?1",
3420            rusqlite::params![item_key],
3421            |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3422        )
3423        .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3424    let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3425    let (value, cost, is_oauth) = match mode {
3426        EnrichMode::ClaudeCode => call_claude(
3427            binary,
3428            ENTITY_TYPE_VALIDATE_PROMPT,
3429            ENTITY_TYPE_VALIDATE_SCHEMA,
3430            &input_text,
3431            model,
3432            timeout,
3433        )?,
3434        EnrichMode::Codex => call_codex(
3435            binary,
3436            ENTITY_TYPE_VALIDATE_PROMPT,
3437            ENTITY_TYPE_VALIDATE_SCHEMA,
3438            &input_text,
3439            model,
3440            timeout,
3441        )?,
3442        EnrichMode::Opencode => call_opencode(
3443            binary,
3444            ENTITY_TYPE_VALIDATE_PROMPT,
3445            ENTITY_TYPE_VALIDATE_SCHEMA,
3446            &input_text,
3447            model,
3448            timeout,
3449        )?,
3450    };
3451    let validated_type = value
3452        .get("validated_type")
3453        .and_then(|v| v.as_str())
3454        .unwrap_or(&ent_type);
3455    let was_correct = value
3456        .get("was_correct")
3457        .and_then(|v| v.as_bool())
3458        .unwrap_or(true);
3459    if !was_correct {
3460        conn.execute(
3461            "UPDATE entities SET type = ?1 WHERE id = ?2",
3462            rusqlite::params![validated_type, ent_id],
3463        )?;
3464    }
3465    Ok(EnrichItemResult::Done {
3466        memory_id: None,
3467        entity_id: Some(ent_id),
3468        entities: 1,
3469        rels: 0,
3470        chars_before: None,
3471        chars_after: None,
3472        cost,
3473        is_oauth,
3474    })
3475}
3476
3477/// G27 P2: Enrich generic memory description via LLM.
3478fn call_description_enrich(
3479    conn: &Connection,
3480    _namespace: &str,
3481    item_key: &str,
3482    binary: &Path,
3483    model: Option<&str>,
3484    timeout: u64,
3485    mode: &EnrichMode,
3486) -> Result<EnrichItemResult, AppError> {
3487    let (mem_id, body, old_desc): (i64, String, String) = conn
3488        .query_row(
3489            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3490            rusqlite::params![item_key],
3491            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3492        )
3493        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3494    let snippet: String = body.chars().take(500).collect();
3495    let input_text = format!(
3496        "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3497    );
3498    let (value, cost, is_oauth) = match mode {
3499        EnrichMode::ClaudeCode => call_claude(
3500            binary,
3501            DESCRIPTION_ENRICH_PROMPT,
3502            DESCRIPTION_ENRICH_SCHEMA,
3503            &input_text,
3504            model,
3505            timeout,
3506        )?,
3507        EnrichMode::Codex => call_codex(
3508            binary,
3509            DESCRIPTION_ENRICH_PROMPT,
3510            DESCRIPTION_ENRICH_SCHEMA,
3511            &input_text,
3512            model,
3513            timeout,
3514        )?,
3515        EnrichMode::Opencode => call_opencode(
3516            binary,
3517            DESCRIPTION_ENRICH_PROMPT,
3518            DESCRIPTION_ENRICH_SCHEMA,
3519            &input_text,
3520            model,
3521            timeout,
3522        )?,
3523    };
3524    let new_desc = value
3525        .get("description")
3526        .and_then(|v| v.as_str())
3527        .unwrap_or(&old_desc);
3528    let old_name: String = conn.query_row(
3529        "SELECT name FROM memories WHERE id = ?1",
3530        rusqlite::params![mem_id],
3531        |r| r.get(0),
3532    )?;
3533    conn.execute(
3534        "UPDATE memories SET description = ?1 WHERE id = ?2",
3535        rusqlite::params![new_desc, mem_id],
3536    )?;
3537    memories::sync_fts_after_update(
3538        conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3539    )?;
3540    Ok(EnrichItemResult::Done {
3541        memory_id: Some(mem_id),
3542        entity_id: None,
3543        entities: 0,
3544        rels: 0,
3545        chars_before: Some(old_desc.len()),
3546        chars_after: Some(new_desc.len()),
3547        cost,
3548        is_oauth,
3549    })
3550}
3551
3552/// G27 P2: Classify memory into domain category via LLM.
3553fn call_domain_classify(
3554    conn: &Connection,
3555    _namespace: &str,
3556    item_key: &str,
3557    binary: &Path,
3558    model: Option<&str>,
3559    timeout: u64,
3560    mode: &EnrichMode,
3561) -> Result<EnrichItemResult, AppError> {
3562    let (mem_id, body, desc): (i64, String, String) = conn
3563        .query_row(
3564            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3565            rusqlite::params![item_key],
3566            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3567        )
3568        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3569    let snippet: String = body.chars().take(500).collect();
3570    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3571    let (value, cost, is_oauth) = match mode {
3572        EnrichMode::ClaudeCode => call_claude(
3573            binary,
3574            DOMAIN_CLASSIFY_PROMPT,
3575            DOMAIN_CLASSIFY_SCHEMA,
3576            &input_text,
3577            model,
3578            timeout,
3579        )?,
3580        EnrichMode::Codex => call_codex(
3581            binary,
3582            DOMAIN_CLASSIFY_PROMPT,
3583            DOMAIN_CLASSIFY_SCHEMA,
3584            &input_text,
3585            model,
3586            timeout,
3587        )?,
3588        EnrichMode::Opencode => call_opencode(
3589            binary,
3590            DOMAIN_CLASSIFY_PROMPT,
3591            DOMAIN_CLASSIFY_SCHEMA,
3592            &input_text,
3593            model,
3594            timeout,
3595        )?,
3596    };
3597    let domain = value
3598        .get("domain")
3599        .and_then(|v| v.as_str())
3600        .unwrap_or("uncategorized");
3601    let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3602    conn.execute(
3603        "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3604        rusqlite::params![metadata, mem_id],
3605    )?;
3606    Ok(EnrichItemResult::Done {
3607        memory_id: Some(mem_id),
3608        entity_id: None,
3609        entities: 0,
3610        rels: 0,
3611        chars_before: None,
3612        chars_after: None,
3613        cost,
3614        is_oauth,
3615    })
3616}
3617
3618/// G27 P2: Audit memory graph quality via LLM.
3619fn call_graph_audit(
3620    conn: &Connection,
3621    _namespace: &str,
3622    item_key: &str,
3623    binary: &Path,
3624    model: Option<&str>,
3625    timeout: u64,
3626    mode: &EnrichMode,
3627) -> Result<EnrichItemResult, AppError> {
3628    let (mem_id, body, desc): (i64, String, String) = conn
3629        .query_row(
3630            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3631            rusqlite::params![item_key],
3632            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3633        )
3634        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3635    let snippet: String = body.chars().take(500).collect();
3636    let ent_count: i64 = conn
3637        .query_row(
3638            "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3639            rusqlite::params![mem_id],
3640            |r| r.get(0),
3641        )
3642        .unwrap_or(0);
3643    let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3644    let (value, cost, is_oauth) = match mode {
3645        EnrichMode::ClaudeCode => call_claude(
3646            binary,
3647            GRAPH_AUDIT_PROMPT,
3648            GRAPH_AUDIT_SCHEMA,
3649            &input_text,
3650            model,
3651            timeout,
3652        )?,
3653        EnrichMode::Codex => call_codex(
3654            binary,
3655            GRAPH_AUDIT_PROMPT,
3656            GRAPH_AUDIT_SCHEMA,
3657            &input_text,
3658            model,
3659            timeout,
3660        )?,
3661        EnrichMode::Opencode => call_opencode(
3662            binary,
3663            GRAPH_AUDIT_PROMPT,
3664            GRAPH_AUDIT_SCHEMA,
3665            &input_text,
3666            model,
3667            timeout,
3668        )?,
3669    };
3670    let issues = value
3671        .get("issues")
3672        .and_then(|v| v.as_array())
3673        .map(|a| a.len())
3674        .unwrap_or(0);
3675    Ok(EnrichItemResult::Done {
3676        memory_id: Some(mem_id),
3677        entity_id: None,
3678        entities: 0,
3679        rels: issues,
3680        chars_before: None,
3681        chars_after: None,
3682        cost,
3683        is_oauth,
3684    })
3685}
3686
3687/// G27 P2: Synthesize research findings into graph entities/relationships via LLM.
3688fn call_deep_research_synth(
3689    conn: &Connection,
3690    namespace: &str,
3691    item_key: &str,
3692    binary: &Path,
3693    model: Option<&str>,
3694    timeout: u64,
3695    mode: &EnrichMode,
3696) -> Result<EnrichItemResult, AppError> {
3697    let (mem_id, body): (i64, String) = conn
3698        .query_row(
3699            "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3700            rusqlite::params![item_key],
3701            |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3702        )
3703        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3704    let snippet: String = body.chars().take(2000).collect();
3705    let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3706    let (value, cost, is_oauth) = match mode {
3707        EnrichMode::ClaudeCode => call_claude(
3708            binary,
3709            DEEP_RESEARCH_SYNTH_PROMPT,
3710            DEEP_RESEARCH_SYNTH_SCHEMA,
3711            &input_text,
3712            model,
3713            timeout,
3714        )?,
3715        EnrichMode::Codex => call_codex(
3716            binary,
3717            DEEP_RESEARCH_SYNTH_PROMPT,
3718            DEEP_RESEARCH_SYNTH_SCHEMA,
3719            &input_text,
3720            model,
3721            timeout,
3722        )?,
3723        EnrichMode::Opencode => call_opencode(
3724            binary,
3725            DEEP_RESEARCH_SYNTH_PROMPT,
3726            DEEP_RESEARCH_SYNTH_SCHEMA,
3727            &input_text,
3728            model,
3729            timeout,
3730        )?,
3731    };
3732    let mut ent_count = 0usize;
3733    let mut rel_count = 0usize;
3734    if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3735        for e in ents {
3736            let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3737            let etype_str = e
3738                .get("entity_type")
3739                .and_then(|v| v.as_str())
3740                .unwrap_or("concept");
3741            let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3742            if name.len() >= 2 {
3743                let ne = NewEntity {
3744                    name: name.to_string(),
3745                    entity_type: etype,
3746                    description: None,
3747                };
3748                let _ = entities::upsert_entity(conn, namespace, &ne);
3749                ent_count += 1;
3750            }
3751        }
3752    }
3753    if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3754        for r in rels {
3755            let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3756            let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3757            if src.is_empty() || tgt.is_empty() {
3758                continue;
3759            }
3760            let rel = r
3761                .get("relation")
3762                .and_then(|v| v.as_str())
3763                .unwrap_or("related");
3764            let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3765            if let (Some(sid), Some(tid)) = (
3766                entities::find_entity_id(conn, namespace, src)?,
3767                entities::find_entity_id(conn, namespace, tgt)?,
3768            ) {
3769                let _ = entities::create_or_fetch_relationship(
3770                    conn, namespace, sid, tid, rel, str_, None,
3771                );
3772                rel_count += 1;
3773            }
3774        }
3775    }
3776    Ok(EnrichItemResult::Done {
3777        memory_id: Some(mem_id),
3778        entity_id: None,
3779        entities: ent_count,
3780        rels: rel_count,
3781        chars_before: None,
3782        chars_after: None,
3783        cost,
3784        is_oauth,
3785    })
3786}
3787
3788/// G27 P2: Extract structured body from unstructured text via LLM.
3789fn call_body_extract(
3790    conn: &Connection,
3791    _namespace: &str,
3792    item_key: &str,
3793    binary: &Path,
3794    model: Option<&str>,
3795    timeout: u64,
3796    mode: &EnrichMode,
3797) -> Result<EnrichItemResult, AppError> {
3798    let (mem_id, body, old_desc): (i64, String, String) = conn
3799        .query_row(
3800            "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3801            rusqlite::params![item_key],
3802            |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3803        )
3804        .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3805    let old_name: String = conn.query_row(
3806        "SELECT name FROM memories WHERE id = ?1",
3807        rusqlite::params![mem_id],
3808        |r| r.get(0),
3809    )?;
3810    let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3811    let (value, cost, is_oauth) = match mode {
3812        EnrichMode::ClaudeCode => call_claude(
3813            binary,
3814            BODY_EXTRACT_PROMPT,
3815            BODY_EXTRACT_SCHEMA,
3816            &input_text,
3817            model,
3818            timeout,
3819        )?,
3820        EnrichMode::Codex => call_codex(
3821            binary,
3822            BODY_EXTRACT_PROMPT,
3823            BODY_EXTRACT_SCHEMA,
3824            &input_text,
3825            model,
3826            timeout,
3827        )?,
3828        EnrichMode::Opencode => call_opencode(
3829            binary,
3830            BODY_EXTRACT_PROMPT,
3831            BODY_EXTRACT_SCHEMA,
3832            &input_text,
3833            model,
3834            timeout,
3835        )?,
3836    };
3837    let restructured = value
3838        .get("restructured_body")
3839        .and_then(|v| v.as_str())
3840        .unwrap_or(&body);
3841    let chars_before = body.len();
3842    let chars_after = restructured.len();
3843    let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3844    conn.execute(
3845        "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3846        rusqlite::params![restructured, new_hash, mem_id],
3847    )?;
3848    memories::sync_fts_after_update(
3849        conn,
3850        mem_id,
3851        &old_name,
3852        &old_desc,
3853        &body,
3854        &old_name,
3855        &old_desc,
3856        restructured,
3857    )?;
3858    Ok(EnrichItemResult::Done {
3859        memory_id: Some(mem_id),
3860        entity_id: None,
3861        entities: 0,
3862        rels: 0,
3863        chars_before: Some(chars_before),
3864        chars_after: Some(chars_after),
3865        cost,
3866        is_oauth,
3867    })
3868}
3869
3870/// Scan for pairs of entities that share no direct relationship.
3871#[allow(clippy::type_complexity)]
3872fn scan_isolated_entity_pairs(
3873    conn: &Connection,
3874    namespace: &str,
3875    limit: Option<usize>,
3876) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3877    let limit_val = limit.unwrap_or(50) as i64;
3878    let mut stmt = conn.prepare_cached(
3879        "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3880         WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3881         AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3882           (r.source_id = e1.id AND r.target_id = e2.id) OR \
3883           (r.source_id = e2.id AND r.target_id = e1.id)) \
3884         LIMIT ?2",
3885    )?;
3886    let rows = stmt
3887        .query_map(rusqlite::params![namespace, limit_val], |r| {
3888            Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3889        })?
3890        .collect::<Result<Vec<_>, _>>()?;
3891    Ok(rows)
3892}
3893
3894/// Scan for entities with non-validated types (all entities for type audit).
3895fn scan_entities_for_type_validation(
3896    conn: &Connection,
3897    namespace: &str,
3898    limit: Option<usize>,
3899) -> Result<Vec<(i64, String, String)>, AppError> {
3900    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3901    let sql = format!(
3902        "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3903    );
3904    let mut stmt = conn.prepare(&sql)?;
3905    let rows = stmt
3906        .query_map(rusqlite::params![namespace], |r| {
3907            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3908        })?
3909        .collect::<Result<Vec<_>, _>>()?;
3910    Ok(rows)
3911}
3912
3913/// Scan for memories with generic descriptions (ingested, imported, etc).
3914fn scan_generic_descriptions(
3915    conn: &Connection,
3916    namespace: &str,
3917    limit: Option<usize>,
3918) -> Result<Vec<(i64, String, String)>, AppError> {
3919    let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3920    let sql = format!(
3921        "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3922         AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3923         ORDER BY id {limit_clause}"
3924    );
3925    let mut stmt = conn.prepare(&sql)?;
3926    let rows = stmt
3927        .query_map(rusqlite::params![namespace], |r| {
3928            Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3929        })?
3930        .collect::<Result<Vec<_>, _>>()?;
3931    Ok(rows)
3932}
3933
3934/// Calls the Codex CLI for a single enrichment item.
3935///
3936/// Follows the same contract as `call_claude`: returns `(value, cost_usd, is_oauth=false)`.
3937fn call_codex(
3938    binary: &Path,
3939    prompt: &str,
3940    json_schema: &str,
3941    input_text: &str,
3942    model: Option<&str>,
3943    timeout_secs: u64,
3944) -> Result<(serde_json::Value, f64, bool), AppError> {
3945    use wait_timeout::ChildExt;
3946
3947    // G31+G32+G33 (v1.0.69): validate the model BEFORE spawn, write the
3948    // schema to a trusted cache path (not /tmp), and reuse the
3949    // consolidated JSONL parser. See `codex_spawn.rs` for the canonical
3950    // hardening rationale.
3951    super::codex_spawn::validate_codex_model(model)?;
3952    let schema_file = super::codex_spawn::trusted_schema_path()?;
3953
3954    let args = super::codex_spawn::CodexSpawnArgs {
3955        binary,
3956        prompt,
3957        json_schema,
3958        input_text,
3959        model,
3960        timeout_secs,
3961        schema_path: schema_file.clone(),
3962    };
3963    let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3964
3965    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3966        AppError::Io(std::io::Error::new(
3967            e.kind(),
3968            format!("failed to spawn codex: {e}"),
3969        ))
3970    })?;
3971
3972    let full_prompt = format!("{prompt}\n\n{input_text}");
3973    let stdin_bytes = full_prompt.into_bytes();
3974    let mut child_stdin = child
3975        .stdin
3976        .take()
3977        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3978    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3979        child_stdin.write_all(&stdin_bytes)?;
3980        drop(child_stdin);
3981        Ok(())
3982    });
3983
3984    let start = std::time::Instant::now();
3985    let timeout = std::time::Duration::from_secs(timeout_secs);
3986    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3987    let _ = std::fs::remove_file(&schema_file);
3988
3989    match status {
3990        Some(exit_status) => {
3991            stdin_thread
3992                .join()
3993                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3994                .map_err(AppError::Io)?;
3995
3996            tracing::debug!(
3997                target: "process",
3998                exit_code = ?exit_status.code(),
3999                elapsed_ms = start.elapsed().as_millis() as u64,
4000                "external process completed"
4001            );
4002
4003            let mut stdout_buf = Vec::new();
4004            if let Some(mut out) = child.stdout.take() {
4005                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4006            }
4007            if !exit_status.success() {
4008                let mut stderr_buf = Vec::new();
4009                if let Some(mut err) = child.stderr.take() {
4010                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4011                }
4012                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4013                tracing::warn!(
4014                    target: "enrich",
4015                    exit_code = ?exit_status.code(),
4016                    stderr = %stderr_str.trim(),
4017                    "codex process failed"
4018                );
4019                return Err(AppError::Validation(format!(
4020                    "codex exited with code {:?}: {}",
4021                    exit_status.code(),
4022                    stderr_str.trim()
4023                )));
4024            }
4025            let stdout_str = String::from_utf8(stdout_buf)
4026                .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4027            // G32: use the JSONL parser, NOT serde_json::from_str on the
4028            // entire stdout (codex emits one event per line).
4029            let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4030            // Return the raw agent_message text parsed as JSON. Different
4031            // operations (memory-bindings, body-enrich) use different
4032            // output schemas, so we let the caller pick which fields to
4033            // extract. The previous implementation hardcoded
4034            // `{entities, urls}` which broke body-enrich.
4035            let value: serde_json::Value =
4036                serde_json::from_str(&result.last_agent_text).map_err(|e| {
4037                    AppError::Validation(format!(
4038                        "codex agent_message is not valid JSON: {e}; raw={}",
4039                        result.last_agent_text
4040                    ))
4041                })?;
4042            Ok((value, 0.0, false))
4043        }
4044        None => {
4045            let _ = child.kill();
4046            let _ = child.wait();
4047            let _ = stdin_thread.join();
4048            Err(AppError::Validation(format!(
4049                "codex timed out after {timeout_secs} seconds"
4050            )))
4051        }
4052    }
4053}
4054
4055fn call_opencode(
4056    binary: &Path,
4057    prompt: &str,
4058    json_schema: &str,
4059    input_text: &str,
4060    model: Option<&str>,
4061    timeout_secs: u64,
4062) -> Result<(serde_json::Value, f64, bool), AppError> {
4063    use wait_timeout::ChildExt;
4064
4065    let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4066
4067    let augmented_prompt = if json_schema.is_empty() {
4068        prompt.to_string()
4069    } else {
4070        format!(
4071            "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4072             The JSON MUST match this schema:\n{json_schema}"
4073        )
4074    };
4075
4076    let mut cmd = super::opencode_runner::build_opencode_command_sync(
4077        binary,
4078        &resolved_model,
4079        &augmented_prompt,
4080        input_text,
4081    )?;
4082
4083    let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4084        AppError::Io(std::io::Error::new(
4085            e.kind(),
4086            format!("failed to spawn opencode: {e}"),
4087        ))
4088    })?;
4089
4090    let start = std::time::Instant::now();
4091    let timeout = std::time::Duration::from_secs(timeout_secs);
4092    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4093
4094    match status {
4095        Some(exit_status) => {
4096            tracing::debug!(
4097                target: "process",
4098                exit_code = ?exit_status.code(),
4099                elapsed_ms = start.elapsed().as_millis() as u64,
4100                "opencode process completed"
4101            );
4102
4103            let mut stdout_buf = Vec::new();
4104            if let Some(mut out) = child.stdout.take() {
4105                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4106            }
4107            if !exit_status.success() {
4108                let mut stderr_buf = Vec::new();
4109                if let Some(mut err) = child.stderr.take() {
4110                    std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4111                }
4112                let stderr_str = String::from_utf8_lossy(&stderr_buf);
4113                tracing::warn!(
4114                    target: "enrich",
4115                    exit_code = ?exit_status.code(),
4116                    stderr = %stderr_str.trim(),
4117                    "opencode process failed"
4118                );
4119                return Err(AppError::Validation(format!(
4120                    "opencode exited with code {:?}: {}",
4121                    exit_status.code(),
4122                    stderr_str.trim()
4123                )));
4124            }
4125            let stdout_str = String::from_utf8(stdout_buf)
4126                .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4127            let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4128            let value: serde_json::Value =
4129                super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4130                    AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4131                })?;
4132            Ok((value, cost, false))
4133        }
4134        None => {
4135            let _ = child.kill();
4136            let _ = child.wait();
4137            Err(AppError::Validation(format!(
4138                "opencode timed out after {timeout_secs} seconds"
4139            )))
4140        }
4141    }
4142}
4143
4144// ---------------------------------------------------------------------------
4145// Tests
4146// ---------------------------------------------------------------------------
4147
4148#[cfg(test)]
4149mod tests {
4150    use super::*;
4151    use rusqlite::Connection;
4152    #[cfg(unix)]
4153    use std::os::unix::fs::PermissionsExt;
4154
4155    /// Opens an in-memory SQLite database with a minimal schema for unit tests.
4156    fn open_test_db() -> Connection {
4157        let conn = Connection::open_in_memory().expect("in-memory db");
4158        conn.execute_batch(
4159            "CREATE TABLE memories (
4160                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4161                namespace   TEXT NOT NULL DEFAULT 'global',
4162                name        TEXT NOT NULL,
4163                type        TEXT NOT NULL DEFAULT 'note',
4164                description TEXT NOT NULL DEFAULT '',
4165                body        TEXT NOT NULL DEFAULT '',
4166                body_hash   TEXT NOT NULL DEFAULT '',
4167                session_id  TEXT,
4168                source      TEXT NOT NULL DEFAULT 'agent',
4169                metadata    TEXT NOT NULL DEFAULT '{}',
4170                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4171                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4172                deleted_at  INTEGER,
4173                UNIQUE(namespace, name)
4174            );
4175            CREATE TABLE entities (
4176                id          INTEGER PRIMARY KEY AUTOINCREMENT,
4177                namespace   TEXT NOT NULL DEFAULT 'global',
4178                name        TEXT NOT NULL,
4179                type        TEXT NOT NULL DEFAULT 'concept',
4180                description TEXT,
4181                degree      INTEGER NOT NULL DEFAULT 0,
4182                created_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4183                updated_at  INTEGER NOT NULL DEFAULT (unixepoch()),
4184                UNIQUE(namespace, name)
4185            );
4186            CREATE TABLE memory_entities (
4187                memory_id  INTEGER NOT NULL,
4188                entity_id  INTEGER NOT NULL,
4189                PRIMARY KEY (memory_id, entity_id)
4190            );
4191            CREATE TABLE relationships (
4192                id         INTEGER PRIMARY KEY AUTOINCREMENT,
4193                namespace  TEXT NOT NULL DEFAULT 'global',
4194                source_id  INTEGER NOT NULL,
4195                target_id  INTEGER NOT NULL,
4196                relation   TEXT NOT NULL,
4197                weight     REAL NOT NULL DEFAULT 0.5,
4198                description TEXT,
4199                UNIQUE(source_id, target_id, relation)
4200            );
4201            CREATE TABLE memory_embeddings (
4202                memory_id   INTEGER PRIMARY KEY,
4203                namespace   TEXT NOT NULL,
4204                embedding   BLOB NOT NULL,
4205                source      TEXT NOT NULL,
4206                model       TEXT NOT NULL DEFAULT '',
4207                dim         INTEGER NOT NULL DEFAULT 384,
4208                created_at  INTEGER NOT NULL DEFAULT (unixepoch())
4209            );",
4210        )
4211        .expect("schema creation must succeed");
4212        conn
4213    }
4214
4215    #[test]
4216    fn scan_unbound_memories_finds_memories_without_bindings() {
4217        let conn = open_test_db();
4218        conn.execute(
4219            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4220            [],
4221        )
4222        .unwrap();
4223
4224        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4225        assert_eq!(results.len(), 1);
4226        assert_eq!(results[0].1, "test-mem");
4227    }
4228
4229    #[test]
4230    fn scan_unbound_memories_excludes_bound_memories() {
4231        let conn = open_test_db();
4232        conn.execute(
4233            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4234            [],
4235        )
4236        .unwrap();
4237        let mem_id: i64 = conn
4238            .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4239                r.get(0)
4240            })
4241            .unwrap();
4242        conn.execute(
4243            "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4244            [],
4245        )
4246        .unwrap();
4247        let ent_id: i64 = conn
4248            .query_row(
4249                "SELECT id FROM entities WHERE name='some-entity'",
4250                [],
4251                |r| r.get(0),
4252            )
4253            .unwrap();
4254        conn.execute(
4255            "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4256            rusqlite::params![mem_id, ent_id],
4257        )
4258        .unwrap();
4259
4260        let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4261        assert!(results.is_empty(), "bound memory must not appear in scan");
4262    }
4263
4264    #[test]
4265    fn scan_entities_without_description_finds_null_description() {
4266        let conn = open_test_db();
4267        conn.execute(
4268            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4269            [],
4270        )
4271        .unwrap();
4272
4273        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4274        assert_eq!(results.len(), 1);
4275        assert_eq!(results[0].1, "my-tool");
4276    }
4277
4278    #[test]
4279    fn scan_entities_without_description_excludes_entities_with_description() {
4280        let conn = open_test_db();
4281        conn.execute(
4282            "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4283            [],
4284        )
4285        .unwrap();
4286
4287        let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4288        assert!(
4289            results.is_empty(),
4290            "entity with description must not appear"
4291        );
4292    }
4293
4294    #[test]
4295    fn scan_short_body_memories_finds_short_bodies() {
4296        let conn = open_test_db();
4297        conn.execute(
4298            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4299            [],
4300        )
4301        .unwrap();
4302
4303        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4304        assert_eq!(results.len(), 1);
4305        assert_eq!(results[0].1, "short-mem");
4306    }
4307
4308    #[test]
4309    fn scan_short_body_memories_excludes_long_bodies() {
4310        let conn = open_test_db();
4311        let long_body = "a".repeat(1000);
4312        conn.execute(
4313            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4314            rusqlite::params![long_body],
4315        )
4316        .unwrap();
4317
4318        let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4319        assert!(results.is_empty(), "long memory must not appear in scan");
4320    }
4321
4322    #[test]
4323    fn scan_respects_limit() {
4324        let conn = open_test_db();
4325        for i in 0..5 {
4326            conn.execute(
4327                &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4328                [],
4329            )
4330            .unwrap();
4331        }
4332
4333        let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4334        assert_eq!(results.len(), 3, "limit must be respected");
4335    }
4336
4337    #[test]
4338    fn scan_memories_without_embeddings_finds_only_missing_rows() {
4339        let conn = open_test_db();
4340        conn.execute(
4341            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4342            [],
4343        )
4344        .unwrap();
4345        conn.execute(
4346            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4347            [],
4348        )
4349        .unwrap();
4350        let memory_id: i64 = conn
4351            .query_row(
4352                "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4353                [],
4354                |r| r.get(0),
4355            )
4356            .unwrap();
4357        let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4358        memories::upsert_vec(
4359            &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4360        )
4361        .unwrap();
4362
4363        let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4364        assert_eq!(results.len(), 1);
4365        assert_eq!(results[0].1, "missing-vec");
4366    }
4367
4368    #[test]
4369    fn scan_memories_without_embeddings_respects_name_filter() {
4370        let conn = open_test_db();
4371        conn.execute(
4372            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4373            [],
4374        )
4375        .unwrap();
4376        conn.execute(
4377            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4378            [],
4379        )
4380        .unwrap();
4381
4382        let results =
4383            scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4384                .unwrap();
4385        assert_eq!(results.len(), 1);
4386        assert_eq!(results[0].1, "match-me");
4387    }
4388
4389    #[test]
4390    fn queue_db_schema_creates_correctly() {
4391        let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4392        let conn = open_queue_db(&tmp_path).expect("queue db must open");
4393        let count: i64 = conn
4394            .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4395            .unwrap();
4396        assert_eq!(count, 0);
4397        let _ = std::fs::remove_file(&tmp_path);
4398    }
4399
4400    #[test]
4401    fn parse_claude_output_valid_bindings() {
4402        let output = r#"[
4403            {"type":"system","subtype":"init"},
4404            {"type":"result","is_error":false,"total_cost_usd":0.01,
4405             "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4406        ]"#;
4407        let result = crate::commands::claude_runner::parse_claude_output(output)
4408            .expect("must parse successfully");
4409        assert!(result.value.get("entities").is_some());
4410        assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4411        assert!(!result.is_oauth);
4412    }
4413
4414    #[test]
4415    fn parse_claude_output_detects_oauth() {
4416        let output = r#"[
4417            {"type":"system","subtype":"init","apiKeySource":"none"},
4418            {"type":"result","is_error":false,"total_cost_usd":0.0,
4419             "structured_output":{"entities":[],"relationships":[]}}
4420        ]"#;
4421        let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4422        assert!(result.is_oauth);
4423    }
4424
4425    #[test]
4426    fn parse_claude_output_rate_limit_returns_error() {
4427        let output = r#"[
4428            {"type":"system","subtype":"init"},
4429            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4430        ]"#;
4431        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4432        assert!(matches!(err, AppError::RateLimited { .. }));
4433    }
4434
4435    #[test]
4436    fn parse_claude_output_auth_error() {
4437        let output = r#"[
4438            {"type":"system","subtype":"init"},
4439            {"type":"result","is_error":true,"error":"authentication failed"}
4440        ]"#;
4441        let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4442        assert!(format!("{err}").contains("authentication failed"));
4443    }
4444
4445    #[cfg(unix)]
4446    #[test]
4447    fn call_codex_returns_raw_json_for_body_enrich_schema() {
4448        let tmp = tempfile::tempdir().expect("tempdir");
4449        let binary = tmp.path().join("codex-mock");
4450        std::fs::write(
4451            &binary,
4452            r#"#!/usr/bin/env bash
4453set -euo pipefail
4454cat <<'JSONL'
4455{"type":"thread.started","thread_id":"mock-thread-0"}
4456{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4457{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4458JSONL
4459"#,
4460        )
4461        .expect("mock codex write");
4462        let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4463        perms.set_mode(0o755);
4464        std::fs::set_permissions(&binary, perms).expect("chmod");
4465
4466        let (value, cost, is_oauth) =
4467            call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4468                .expect("call_codex must accept body-enrich payload");
4469
4470        assert_eq!(value["enriched_body"], "expanded body");
4471        assert_eq!(cost, 0.0);
4472        assert!(!is_oauth);
4473    }
4474
4475    #[test]
4476    fn dry_run_emits_preview_without_calling_llm() {
4477        // This test validates the dry-run NDJSON contract without spawning any process.
4478        // The scan_operation function requires a DB; we build one in-memory but cannot
4479        // call run() directly because it needs AppPaths (disk). Instead we test the
4480        // lower-level helpers that the dry-run path relies on.
4481        let conn = open_test_db();
4482        conn.execute(
4483            "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4484            [],
4485        )
4486        .unwrap();
4487
4488        let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4489        assert_eq!(results.len(), 1);
4490        assert_eq!(results[0].1, "dry-mem");
4491        // If scan finds the item and dry_run is set, no LLM would be called.
4492        // The NDJSON emission is tested via integration tests with a fake binary.
4493    }
4494
4495    #[test]
4496    fn persist_entity_description_updates_db() {
4497        let conn = open_test_db();
4498        conn.execute(
4499            "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4500            [],
4501        )
4502        .unwrap();
4503        let eid: i64 = conn
4504            .query_row(
4505                "SELECT id FROM entities WHERE name='tokio-runtime'",
4506                [],
4507                |r| r.get(0),
4508            )
4509            .unwrap();
4510
4511        persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4512
4513        let desc: String = conn
4514            .query_row(
4515                "SELECT description FROM entities WHERE id=?1",
4516                rusqlite::params![eid],
4517                |r| r.get(0),
4518            )
4519            .unwrap();
4520        assert_eq!(desc, "Async runtime for Rust applications");
4521    }
4522
4523    #[test]
4524    fn bindings_schema_is_valid_json() {
4525        let _: serde_json::Value =
4526            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4527    }
4528
4529    #[test]
4530    fn entity_description_schema_is_valid_json() {
4531        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4532            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4533    }
4534
4535    #[test]
4536    fn body_enrich_schema_is_valid_json() {
4537        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4538            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4539    }
4540}