Skip to main content

sqlite_graphrag/commands/enrich/
mod.rs

1// v1.0.97: modularised into queue.rs, scan.rs, postprocess.rs, extraction.rs.
2// See ADR-0056 (closes the ADR-0046 "Known Tech Debt (v1.0.89+)" item).
3
4//! Handler for the `enrich` CLI subcommand (GAP-14 + GAP-18).
5//!
6//! Enriches the knowledge graph by running LLM-powered analysis over memories
7//! and entities that are missing key structural data. Operations are:
8//!
9//! - `memory-bindings`: memories without `memory_entities` rows get entity extraction
10//! - `entity-descriptions`: entities with NULL/empty descriptions get LLM descriptions
11//! - `body-enrich`: memories with short bodies get expanded by the LLM (GAP-18)
12//! - `re-embed`: memories without a vector row get re-embedded without rewriting body
13//!
14//! Architecture mirrors `ingest_claude.rs`: SCAN → JUDGE (LLM) → PERSIST, with a
15//! SQLite queue DB derived next to `--db` (GAP-SG-64) for resume/retry support.
16// Workload: Subprocess I/O-bound (claude/codex API calls with network wait)
17//!
18//! # DRY note
19//!
20//! v1.0.97: `claude_runner.rs` now hosts the shared Claude invocation helpers
21//! (`run_claude`, `parse_claude_output`, `spawn_with_memory_limit`). The queue
22//! DB schema in `ingest_claude.rs` still duplicates `open_queue_db` here — a
23//! future pass can unify them.
24
25mod extraction;
26mod postprocess;
27mod queue;
28mod scan;
29use extraction::{
30    call_body_enrich, call_body_extract, call_deep_research_synth, call_description_enrich,
31    call_domain_classify, call_entity_connect, call_entity_description, call_entity_type_validate,
32    call_graph_audit, call_memory_bindings, call_reembed, call_relation_reclassify,
33    call_weight_calibrate, find_codex_binary, take_last_openrouter_failure, EnrichItemResult,
34};
35use postprocess::{
36    persist_enriched_body, persist_entity_description, persist_memory_bindings,
37    reembed_memory_vector, take_enrich_backend,
38};
39pub use queue::{cleanup_queue_entry, DeadItem, DeadSummary, EnrichStatus, WaitingItem};
40use queue::{
41    dequeue_next_pending, enqueue_candidate, item_type_for, item_type_for_key, open_queue_db,
42    prune_dead_orphans, record_item_failure, record_item_failure_typed, skipped_item_keys,
43    DequeueOutcome,
44};
45use scan::{
46    count_operation_backlog, scan_isolated_entity_pairs, scan_operation, scan_unbound_memories,
47};
48
49use crate::commands::ingest_claude::find_claude_binary;
50use crate::constants::MAX_MEMORY_BODY_LEN;
51use crate::entity_type::EntityType;
52use crate::errors::AppError;
53use crate::paths::AppPaths;
54use crate::storage::connection::{ensure_db_ready, open_rw};
55use crate::storage::entities::{self, NewEntity, NewRelationship};
56use crate::storage::memories;
57
58use rusqlite::Connection;
59use serde::{Deserialize, Serialize};
60use std::io::Write;
61use std::path::{Path, PathBuf};
62use std::time::Instant;
63
64// ---------------------------------------------------------------------------
65// Constants
66// ---------------------------------------------------------------------------
67
68const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
69const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
70const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
71
72// ---------------------------------------------------------------------------
73// JSON schema used for memory-bindings and body-enrich extraction
74// ---------------------------------------------------------------------------
75
76const BINDINGS_SCHEMA: &str = r#"{
77  "type": "object",
78  "properties": {
79    "entities": {
80      "type": "array",
81      "items": {
82        "type": "object",
83        "properties": {
84          "name": { "type": "string" },
85          "entity_type": {
86            "type": "string",
87            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
88          }
89        },
90        "required": ["name", "entity_type"],
91        "additionalProperties": false
92      }
93    },
94    "relationships": {
95      "type": "array",
96      "items": {
97        "type": "object",
98        "properties": {
99          "source": { "type": "string" },
100          "target": { "type": "string" },
101          "relation": {
102            "type": "string",
103            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
104          },
105          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
106        },
107        "required": ["source","target","relation","strength"],
108        "additionalProperties": false
109      }
110    }
111  },
112  "required": ["entities","relationships"],
113  "additionalProperties": false
114}"#;
115
116const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
117  "type": "object",
118  "properties": {
119    "description": { "type": "string" }
120  },
121  "required": ["description"],
122  "additionalProperties": false
123}"#;
124
125const BODY_ENRICH_SCHEMA: &str = r#"{
126  "type": "object",
127  "properties": {
128    "enriched_body": { "type": "string" }
129  },
130  "required": ["enriched_body"],
131  "additionalProperties": false
132}"#;
133
134// G27 P1: weight-calibrate
135const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
136Scale:\n\
137- 0.9 = vital hard dependency (A cannot function without B)\n\
138- 0.7 = important design relationship (A strongly supports/enables B)\n\
139- 0.5 = useful contextual link (A and B share relevant context)\n\
140- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
141Respond with the calibrated weight and brief reasoning.";
142
143const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
144  "type": "object",
145  "properties": {
146    "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
147    "reasoning": { "type": "string" }
148  },
149  "required": ["calibrated_weight", "reasoning"],
150  "additionalProperties": false
151}"#;
152
153// G27 P1: relation-reclassify
154const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
155Valid canonical relations (pick exactly one):\n\
156- depends-on: A cannot function without B\n\
157- uses: A utilizes B but could substitute it\n\
158- supports: A reinforces or enables B\n\
159- causes: A triggers or produces B\n\
160- fixes: A resolves a problem in B\n\
161- contradicts: A conflicts with or invalidates B\n\
162- applies-to: A is relevant to or scoped within B\n\
163- follows: A comes after B in sequence\n\
164- replaces: A substitutes B\n\
165- tracked-in: A is monitored in B\n\
166- related: A and B share context (use sparingly)\n\n\
167Respond with the correct relation, strength, and reasoning.";
168
169const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
170  "type": "object",
171  "properties": {
172    "relation": { "type": "string" },
173    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
174    "reasoning": { "type": "string" }
175  },
176  "required": ["relation", "strength", "reasoning"],
177  "additionalProperties": false
178}"#;
179
180// G27 P2: entity-connect — suggest relationships between isolated entities
181const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
182Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
183If NO meaningful relationship exists, set relation to \"none\".\n\
184Respond with the relation (or \"none\"), strength, and reasoning.";
185
186const ENTITY_CONNECT_SCHEMA: &str = r#"{
187  "type": "object",
188  "properties": {
189    "relation": { "type": "string" },
190    "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
191    "reasoning": { "type": "string" }
192  },
193  "required": ["relation", "strength", "reasoning"],
194  "additionalProperties": false
195}"#;
196
197// G27 P2: entity-type-validate — verify entity type assignments
198const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
199Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
200If the current type is correct, keep it. If wrong, suggest the correct type.\n\
201Respond with the validated type and reasoning.";
202
203const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
204  "type": "object",
205  "properties": {
206    "validated_type": { "type": "string" },
207    "was_correct": { "type": "boolean" },
208    "reasoning": { "type": "string" }
209  },
210  "required": ["validated_type", "was_correct", "reasoning"],
211  "additionalProperties": false
212}"#;
213
214// G27 P2: description-enrich — improve generic memory descriptions
215const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
216BAD: 'ingested from docs/auth.md'\n\
217GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
218Respond with the improved description and reasoning.";
219
220const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
221  "type": "object",
222  "properties": {
223    "description": { "type": "string" },
224    "reasoning": { "type": "string" }
225  },
226  "required": ["description", "reasoning"],
227  "additionalProperties": false
228}"#;
229
230// G27 P2: domain-classify — classify memory into domain category
231const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
232Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
233
234const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
235  "type": "object",
236  "properties": {
237    "domain": { "type": "string" },
238    "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
239    "reasoning": { "type": "string" }
240  },
241  "required": ["domain", "confidence", "reasoning"],
242  "additionalProperties": false
243}"#;
244
245// G27 P2: graph-audit — audit graph for quality issues
246const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
247Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
248Respond with a list of issues found (or empty if none) and an overall quality score.";
249
250const GRAPH_AUDIT_SCHEMA: &str = r#"{
251  "type": "object",
252  "properties": {
253    "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
254    "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
255    "reasoning": { "type": "string" }
256  },
257  "required": ["quality_score", "issues", "reasoning"],
258  "additionalProperties": false
259}"#;
260
261// G27 P2: deep-research-synth — synthesize research findings into graph
262const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
263Entity names: lowercase kebab-case, domain-specific.\n\
264Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
265Respond with extracted entities, relationships, and a synthesis summary.";
266
267const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
268  "type": "object",
269  "properties": {
270    "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
271    "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
272    "summary": { "type": "string" }
273  },
274  "required": ["entities", "relationships", "summary"],
275  "additionalProperties": false
276}"#;
277
278// G27 P2: body-extract — extract structured content from unstructured text
279const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
280Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
281Respond with the restructured body and a brief summary of changes.";
282
283const BODY_EXTRACT_SCHEMA: &str = r#"{
284  "type": "object",
285  "properties": {
286    "restructured_body": { "type": "string" },
287    "changes_summary": { "type": "string" }
288  },
289  "required": ["restructured_body", "changes_summary"],
290  "additionalProperties": false
291}"#;
292
293// ---------------------------------------------------------------------------
294// Prompts
295// ---------------------------------------------------------------------------
296
297const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2981. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2992. Typed relationships between entities with strength scores\n\n\
300Rules:\n\
301- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
302- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
303- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
304- NEVER use 'mentions' as relationship type\n\
305- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
306- Prefer fewer high-quality entities over many low-quality ones";
307
308const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
309
310const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
311
312// ---------------------------------------------------------------------------
313// CLI args
314// ---------------------------------------------------------------------------
315
316/// Operation to perform in the `enrich` command.
317#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
318#[serde(rename_all = "kebab-case")]
319pub enum EnrichOperation {
320    /// Add missing entity/relationship bindings to memories (fully implemented).
321    /// memory-bindings LINKS each memory to the EXISTING entities extracted from
322    /// its body — it does not invent a new graph, it only connects what is missing. Scans
323    /// only UNBOUND memories (those with zero `memory_entities`).
324    MemoryBindings,
325    /// GAP-SG-24/26: additive augmentation — re-run binding extraction over
326    /// memories that are ALREADY bound, filtered by `--names`/`--names-file`, to
327    /// merge newly-discovered entities/relationships WITHOUT removing existing
328    /// links. Requires a name filter (refuses to re-scan the whole namespace).
329    AugmentBindings,
330    /// Fill NULL/empty entity descriptions with LLM-generated summaries (fully implemented).
331    EntityDescriptions,
332    /// Expand short memory bodies into richer content (fully implemented, GAP-18).
333    BodyEnrich,
334    /// Rebuild missing memory embeddings without rewriting the memory body.
335    ReEmbed,
336    /// Calibrate relationship weights using LLM analysis (scan only).
337    WeightCalibrate,
338    /// Reclassify relationship types using LLM judgment (scan only).
339    RelationReclassify,
340    /// Connect isolated entities by suggesting new relationships (scan only).
341    EntityConnect,
342    /// Validate entity type assignments using LLM judgment (scan only).
343    EntityTypeValidate,
344    /// Enrich memory descriptions that are generic/auto-generated (scan only).
345    DescriptionEnrich,
346    /// Identify cross-domain bridges between disconnected subgraphs (scan only).
347    CrossDomainBridges,
348    /// Classify memories into domain categories (scan only).
349    DomainClassify,
350    /// Audit the graph for quality issues (scan only).
351    GraphAudit,
352    /// Synthesize deep-research findings into graph memories (scan only).
353    DeepResearchSynth,
354    /// Extract structured body from unstructured text (scan only).
355    BodyExtract,
356}
357
358/// v1.1.1 (P2): which embedding table the `re-embed` operation backfills.
359///
360/// `memories` is the historical behaviour (and the default, so existing
361/// invocations are unchanged); `entities` and `chunks` close the retroactive
362/// coverage gap for `entity_embeddings` / `chunk_embeddings`; `all` runs the
363/// three scans in one invocation.
364#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
365#[serde(rename_all = "kebab-case")]
366pub enum ReEmbedTarget {
367    /// Memories without a live vector in `memory_embeddings` (default).
368    Memories,
369    /// Entities without a live vector in `entity_embeddings`.
370    Entities,
371    /// Chunks without a live vector in `chunk_embeddings`.
372    Chunks,
373    /// All three targets in a single run.
374    All,
375}
376
377/// LLM provider for enrichment.
378#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
379pub enum EnrichMode {
380    /// Use locally installed Claude Code CLI (OAuth-first).
381    ClaudeCode,
382    /// Use locally installed OpenAI Codex CLI.
383    Codex,
384    /// Use locally installed OpenCode CLI.
385    #[value(name = "opencode")]
386    Opencode,
387    /// Use the OpenRouter chat-completions REST API (no local CLI; v1.0.95).
388    #[value(name = "openrouter")]
389    OpenRouter,
390}
391
392impl std::fmt::Display for EnrichMode {
393    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394        match self {
395            EnrichMode::ClaudeCode => write!(f, "claude-code"),
396            EnrichMode::Codex => write!(f, "codex"),
397            EnrichMode::Opencode => write!(f, "opencode"),
398            EnrichMode::OpenRouter => write!(f, "openrouter"),
399        }
400    }
401}
402
403/// Arguments for the `enrich` subcommand.
404#[derive(clap::Args)]
405#[command(
406    about = "Enrich graph memories and entities using an LLM provider",
407    after_long_help = "EXAMPLES:\n  \
408    # Add missing entity bindings to all unbound memories\n  \
409    sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n  \
410    # Fill entity descriptions (dry-run preview, no tokens spent)\n  \
411    sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n  \
412    # Expand short memory bodies (GAP-18)\n  \
413    sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n  \
414    # Rebuild only missing memory embeddings without rewriting bodies\n  \
415    sqlite-graphrag enrich --operation re-embed --limit 100\n\n  \
416    # Resume an interrupted body-enrich run\n  \
417    sqlite-graphrag enrich --operation body-enrich --resume --json\n\n  \
418    # Retry only failed items from a previous run\n  \
419    sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n  \
420    # Converge the whole backlog (internal scan+drain loop, no bash wrapper)\n  \
421    sqlite-graphrag enrich --operation memory-bindings --mode openrouter \\\n    \
422      --openrouter-model deepseek/deepseek-v4-flash:nitro --until-empty --max-runtime 600\n\n  \
423    # Inspect / resurrect dead-letter items\n  \
424    sqlite-graphrag enrich --operation memory-bindings --list-dead\n  \
425    sqlite-graphrag enrich --operation memory-bindings --requeue-dead\n\n  \
426    # Read-only status (no LLM, no singleton)\n  \
427    sqlite-graphrag enrich --operation memory-bindings --status\n\n\
428    OPERATIONS NOTE:\n  \
429    memory-bindings LINKS each memory to the EXISTING entities extracted from its\n  \
430    body — it does not invent a new graph, it connects what is missing. It scans\n  \
431    only UNBOUND memories. To re-run extraction over ALREADY-bound memories and\n  \
432    MERGE newly-found entities/relationships additively (without removing links),\n  \
433    use --operation augment-bindings with --names/--names-file.\n\n\
434    DEAD-LETTER SIDECAR (.enrich-queue.sqlite):\n  \
435    A SQLite sidecar tracks each work item across runs. Schema (table `queue`):\n  \
436    item_key (UNIQUE name/id), item_type (memory|entity), operation, memory_id,\n  \
437    status (pending|processing|done|skipped|dead), attempt, error, error_class,\n  \
438    next_retry_at (backoff cooldown). --until-empty loops scan→drain internally\n  \
439    until eligible items are exhausted; transient failures (incl. malformed/non-\n  \
440    JSON LLM output, GAP-SG-09) reschedule with backoff until --max-attempts, then\n  \
441    land in status='dead'. Use --status to see the queue, --list-dead to inspect\n  \
442    the sink, --requeue-dead to retry it, and --ignore-backoff to skip cooldowns.\n  \
443    --names/--names-file also remedy a cooldown by targeting a specific subset.\n\n\
444    EXIT CODES:\n  \
445    0  success\n  \
446    1  validation error (bad args, binary not found)\n  \
447    14 I/O error"
448)]
449pub struct EnrichArgs {
450    /// Enrichment operation to run. Required for write operations; optional for
451    /// the read-only queue inspectors (`--status` / `--list-dead` /
452    /// `--requeue-dead`), where it defaults to `memory-bindings` when omitted
453    /// (GAP-SG-31).
454    #[arg(
455        long,
456        short = 'o',
457        value_enum,
458        value_name = "OPERATION",
459        required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
460    )]
461    pub operation: Option<EnrichOperation>,
462
463    /// LLM provider to use. Required for write operations; not needed for the
464    /// read-only queue inspectors (`--status` / `--list-dead` /
465    /// `--requeue-dead`), which never call the LLM (GAP-SG-31).
466    #[arg(
467        long,
468        value_enum,
469        required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
470    )]
471    pub mode: Option<EnrichMode>,
472
473    /// Maximum number of items to process in this run. Omit for all.
474    #[arg(long, value_name = "N")]
475    pub limit: Option<usize>,
476
477    /// v1.1.1 (P2): embedding table backfilled by `--operation re-embed`.
478    /// `memories` (default) preserves the historical behaviour; `entities`
479    /// and `chunks` rebuild `entity_embeddings` / `chunk_embeddings`; `all`
480    /// covers the three tables in one run. The scan also selects rows whose
481    /// stored `dim` diverges from the configured `--embedding-dim` (P10),
482    /// so legacy-dimension vectors are regenerated, not only missing ones.
483    /// Ignored (rejected) for every other operation.
484    #[arg(long, value_enum, value_name = "TARGET", default_value_t = ReEmbedTarget::Memories)]
485    pub target: ReEmbedTarget,
486
487    /// Preview items without calling the LLM (zero tokens consumed).
488    #[arg(long)]
489    pub dry_run: bool,
490
491    /// Namespace to operate on. Default: global.
492    #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
493    pub namespace: Option<String>,
494
495    // -- Provider flags (Claude) --
496    /// Path to the Claude Code binary. Default: auto-detect from PATH.
497    #[arg(long, value_name = "PATH")]
498    pub claude_binary: Option<PathBuf>,
499
500    /// Claude model to use (e.g. claude-sonnet-4-6).
501    #[arg(long, value_name = "MODEL")]
502    pub claude_model: Option<String>,
503
504    /// Timeout per item in seconds when using Claude Code. Default: 300.
505    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
506    pub claude_timeout: u64,
507
508    // -- Provider flags (Codex) --
509    /// Path to the Codex CLI binary. Default: auto-detect from PATH.
510    #[arg(long, value_name = "PATH")]
511    pub codex_binary: Option<PathBuf>,
512
513    /// Codex model to use (e.g. o4-mini).
514    #[arg(long, value_name = "MODEL")]
515    pub codex_model: Option<String>,
516
517    /// Timeout per item in seconds when using Codex. Default: 300.
518    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
519    pub codex_timeout: u64,
520
521    // -- Provider flags (OpenCode) --
522    /// Path to the OpenCode binary. Default: auto-detect from PATH.
523    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
524    pub opencode_binary: Option<PathBuf>,
525
526    /// OpenCode model to use.
527    #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
528    pub opencode_model: Option<String>,
529
530    /// Timeout per item in seconds when using OpenCode. Default: 300.
531    #[arg(
532        long,
533        value_name = "SECONDS",
534        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
535        default_value_t = 300
536    )]
537    pub opencode_timeout: u64,
538
539    // -- Provider flags (OpenRouter, v1.0.95) --
540    /// OpenRouter text model to use (REQUIRED with --mode openrouter; no default).
541    #[arg(long, value_name = "MODEL")]
542    pub openrouter_model: Option<String>,
543
544    /// OpenRouter API key. Falls back to OPENROUTER_API_KEY env or stored config.
545    #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
546    pub openrouter_api_key: Option<String>,
547
548    /// Timeout per item in seconds when using OpenRouter. Default: 600.
549    ///
550    /// GAP-SG-17: raised from 300 to 600 because dense bodies (close to the
551    /// ~32K-token context ceiling of the configured model) routinely take
552    /// longer than five minutes to generate via `deepseek-v4-flash:nitro`.
553    /// Raise it further for very large corpora; lower it for short snippets.
554    #[arg(long, value_name = "SECONDS", default_value_t = 600)]
555    pub openrouter_timeout: u64,
556
557    /// Optional OpenRouter base URL override (reserved; defaults to the public API).
558    #[arg(long, value_name = "URL")]
559    pub openrouter_base_url: Option<String>,
560
561    // -- Cost controls --
562    /// Abort when cumulative cost exceeds this USD budget (API key only; ignored for OAuth).
563    #[arg(long, value_name = "USD")]
564    pub max_cost_usd: Option<f64>,
565
566    // -- Queue controls --
567    /// Resume a previously interrupted run (skip already-done items).
568    #[arg(long)]
569    pub resume: bool,
570
571    /// Retry only items that failed in a previous run.
572    #[arg(long)]
573    pub retry_failed: bool,
574
575    /// GAP-ENRICH-BACKLOG-CONVERGE: loop scan→drain internally until the queue
576    /// empties of eligible items or --max-runtime elapses; removes the need for
577    /// an external bash retry loop.
578    #[arg(long)]
579    pub until_empty: bool,
580
581    /// GAP-ENRICH-BACKLOG-CONVERGE: wall-clock ceiling in seconds for
582    /// --until-empty. Defaults to 3600 when omitted.
583    #[arg(long, value_name = "SECONDS")]
584    pub max_runtime: Option<u64>,
585
586    /// GAP-ENRICH-BACKLOG-CONVERGE: attempts per item before it becomes a
587    /// dead-letter (status='dead'). Range 1..=20. Default 8.
588    ///
589    /// GAP-SG-21: the default was raised from 5 to 8 because GAP-SG-09 now
590    /// reclassifies malformed / non-JSON LLM output as TRANSIENT (retryable)
591    /// rather than a permanent HardFailure. A flaky structured-output model
592    /// (e.g. deepseek-v4-flash:nitro) may emit several bad generations in a row
593    /// even after JSON repair (GAP-SG-10) recovers most of them; the extra
594    /// attempts give the backlog room to converge before an item is parked in
595    /// the dead-letter sink. Permanent faults (ProviderError, NotFound) still
596    /// dead-letter on the first attempt regardless of this value.
597    #[arg(long, value_name = "N", default_value_t = 8, value_parser = clap::value_parser!(u32).range(1..=20))]
598    pub max_attempts: u32,
599
600    /// GAP-ENRICH-BACKLOG-CONVERGE: read-only mode — report queue and backlog
601    /// counts without calling the LLM or acquiring the singleton.
602    #[arg(long)]
603    pub status: bool,
604
605    /// GAP-SG-23: list every dead-letter item (status='dead') for the current
606    /// operation with its error_class, attempt count and last error message.
607    /// Read-only — no LLM, no singleton. Use it to inspect what `--requeue-dead`
608    /// would resurrect before running it.
609    #[arg(long)]
610    pub list_dead: bool,
611
612    /// GAP-SG-11/14: resurrect dead-letter items — move every `status='dead'`
613    /// row back to `pending`, zeroing `attempt`, `next_retry_at`, `error` and
614    /// `error_class`. Distinct from `--retry-failed`, which only resets the
615    /// legacy `status='failed'` rows; dead-letter rows are the terminal sink of
616    /// the v1.0.96 converge loop and are never re-selected without this flag.
617    /// No LLM call or singleton is taken — it only rewrites queue statuses.
618    #[arg(long)]
619    pub requeue_dead: bool,
620
621    /// GAP-SG-66: prune ORPHAN dead-letter rows — remove every `status='dead'`
622    /// memory row whose `item_key` (the memory name) no longer exists in the
623    /// main DB for this namespace. These are terminal "not found" failures that
624    /// `--requeue-dead` can never recover (re-processing re-fails the same way),
625    /// so they inflate `queue_dead` forever. Read-only on the main DB; deletes
626    /// only confirmed-orphan rows from the queue sidecar. Entity-keyed dead rows
627    /// are left untouched. No LLM, no singleton — like `--list-dead`.
628    #[arg(long)]
629    pub prune_dead_orphans: bool,
630
631    /// GAP-SG-16: ignore the per-item backoff cooldown (`next_retry_at`) when
632    /// selecting candidates, so items waiting on exponential backoff are
633    /// processed immediately. Use to drain a backlog whose cooldown windows are
634    /// long but the provider has recovered. Without it, `--status` reports such
635    /// items under `waiting` and they are skipped until their `next_retry_at`.
636    #[arg(long)]
637    pub ignore_backoff: bool,
638
639    /// GAP-SG-28: read-only `body-extract` — extract entities/relationships into
640    /// the graph WITHOUT rewriting (or truncating) the memory body. The default
641    /// `body-extract` restructures the stored body in place; with this flag the
642    /// body is left untouched and only graph bindings are persisted (additive,
643    /// via the same upsert path as `memory-bindings`). Ignored for every other
644    /// operation.
645    #[arg(long)]
646    pub body_extract_graph_only: bool,
647
648    /// GAP-ENRICH-BACKLOG-CONVERGE: REST concurrency for --mode openrouter
649    /// (clamp 1..=16, default 8). Distinct from the legacy --llm-parallelism.
650    #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
651    pub rest_concurrency: Option<u32>,
652
653    // -- body-enrich specific flags (GAP-18) --
654    /// Minimum output character count for body-enrich. Default: 500.
655    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
656    pub min_output_chars: usize,
657
658    /// Maximum output character count for body-enrich. Default: 2000.
659    #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
660    pub max_output_chars: usize,
661
662    /// Check that enriched body preserves all facts from the original (LLM judge). Default: true.
663    #[arg(long, default_value_t = true)]
664    pub preserve_check: bool,
665
666    /// Path to a custom prompt template file for body-enrich.
667    #[arg(long, value_name = "PATH")]
668    pub prompt_template: Option<PathBuf>,
669
670    /// Number of parallel LLM workers (default 1 = serial).
671    /// Each worker claims items atomically from the queue DB via UPDATE...RETURNING.
672    /// Range: 1–32. For 2321 entities, --llm-parallelism 4 reduces wall time ~4×.
673    #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
674    pub llm_parallelism: u32,
675
676    // -- Output / infra --
677    /// Emit NDJSON output. Always true; flag accepted for compatibility.
678    #[arg(long)]
679    pub json: bool,
680
681    /// Database path override.
682    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
683    pub db: Option<String>,
684
685    /// G30: poll for the job singleton every second for up to N seconds
686    /// when another invocation holds the lock. Default: 0 (fail fast).
687    #[arg(long, value_name = "SECONDS")]
688    pub wait_job_singleton: Option<u64>,
689
690    /// G30: force acquisition of the singleton lock by removing a stale
691    /// lock file from a previously crashed invocation. Use only when you
692    /// are certain no other `enrich`/`ingest` is running.
693    #[arg(long, default_value_t = false)]
694    pub force_job_singleton: bool,
695
696    /// G37: select a specific subset of memory names to enrich instead of
697    /// the full candidate set. Comma-separated, e.g. `--names a,b,c`.
698    /// Empty when omitted (processes all candidates).
699    ///
700    /// GAP-SG-18: also a cooldown remedy — when `--status` shows items under
701    /// `waiting` (parked on `next_retry_at` backoff), pass the exact names here
702    /// to re-enqueue and process just that subset on the next run instead of
703    /// waiting for every cooldown to elapse. REQUIRED for `--operation
704    /// augment-bindings`, which refuses to re-scan the whole namespace.
705    #[arg(long, value_name = "NAMES", value_delimiter = ',')]
706    pub names: Vec<String>,
707
708    /// G37: read the subset of memory names from a file (one per line).
709    /// Lines starting with `#` and empty lines are ignored. Combined with
710    /// `--names` (union) when both are set.
711    #[arg(long, value_name = "PATH")]
712    pub names_file: Option<PathBuf>,
713
714    /// G35: probe the LLM provider with a 1-turn ping before processing
715    /// the batch. Aborts with a clear error if the rate-limit window is
716    /// closed (avoids burning N turns only to fail on item 1).
717    #[arg(long, default_value_t = false)]
718    pub preflight_check: bool,
719
720    /// G35: if a preflight probe or in-flight call hits the Claude rate
721    /// limit, fall back to `--fallback-mode` (typically `codex`) instead
722    /// of failing the batch. Ignored when `--mode` is already `codex`.
723    #[arg(long, value_enum)]
724    pub fallback_mode: Option<EnrichMode>,
725
726    /// G35: number of seconds before the OAuth rate-limit reset at which
727    /// the preflight probe should refuse to start. Default 300 (5 min).
728    #[arg(long, value_name = "SECONDS", default_value_t = 300)]
729    pub rate_limit_buffer: u64,
730
731    /// G28-D: refuse to start when the 1-minute load average exceeds
732    /// `2 × ncpus` (or `SQLITE_GRAPHRAG_MAX_LOAD_PER_NCPU` if set).
733    /// Set to false to skip the check on contended CI runners.
734    #[arg(long, default_value_t = true)]
735    pub max_load_check: bool,
736
737    /// G28-D: when the system is saturated, abort the job after this
738    /// many consecutive HardFailure outcomes. Default 5.
739    #[arg(long, value_name = "N", default_value_t = 5)]
740    pub circuit_breaker_threshold: u32,
741
742    /// G29 Step 4: minimum trigram-Jaccard similarity between the
743    /// original body and the LLM-rewritten body for the rewrite to be
744    /// accepted. Scores below the threshold are rejected and emitted as
745    /// `EnrichItemResult::PreservationFailed`. Default 0.7 (per the G29
746    /// gap specification). Ignored when `--operation` is not
747    /// `body-enrich`.
748    #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
749    pub preserve_threshold: f64,
750
751    /// G33 Step 3: when set, validate `--codex-model` against the
752    /// ChatGPT Pro OAuth accepted-model list and abort with a
753    /// suggestion when the value is unknown. Default true (fail fast
754    /// to avoid burning OAuth turns). Set to false to opt out.
755    #[arg(long, default_value_t = true)]
756    pub codex_model_validate: bool,
757
758    /// G33 Step 3: when set together with an invalid `--codex-model`,
759    /// automatically substitute the supplied default (e.g. `gpt-5.5`)
760    /// instead of aborting. The substitution is recorded in the NDJSON
761    /// stream as `provider_substituted: true` for traceability.
762    #[arg(long, value_name = "MODEL")]
763    pub codex_model_fallback: Option<String>,
764}
765
766impl EnrichArgs {
767    /// GAP-SG-31: resolved enrichment operation.
768    ///
769    /// `operation` is `Option` so the read-only queue inspectors
770    /// (`--status` / `--list-dead` / `--requeue-dead`) can run without it.
771    /// Write paths always carry a value (enforced by
772    /// `required_unless_present_any` at parse time); the read-only paths fall
773    /// back to `memory-bindings`, the most common queue, when it is omitted.
774    fn operation(&self) -> EnrichOperation {
775        self.operation
776            .clone()
777            .unwrap_or(EnrichOperation::MemoryBindings)
778    }
779
780    /// GAP-SG-31: resolved LLM provider. `mode` is `Option` for the read-only
781    /// inspectors that never call the LLM; write paths always carry a value
782    /// (enforced by `required_unless_present_any`). The fallback is only ever
783    /// observed by read-only code that does not actually invoke the provider.
784    fn mode(&self) -> EnrichMode {
785        self.mode.clone().unwrap_or(EnrichMode::OpenRouter)
786    }
787}
788
789// ---------------------------------------------------------------------------
790// Internal types — raw LLM output structs
791// ---------------------------------------------------------------------------
792
793// ---------------------------------------------------------------------------
794// NDJSON event types emitted to stdout
795// ---------------------------------------------------------------------------
796
797#[derive(Debug, Serialize)]
798struct PhaseEvent<'a> {
799    phase: &'a str,
800    #[serde(skip_serializing_if = "Option::is_none")]
801    binary_path: Option<&'a str>,
802    #[serde(skip_serializing_if = "Option::is_none")]
803    version: Option<&'a str>,
804    #[serde(skip_serializing_if = "Option::is_none")]
805    items_total: Option<usize>,
806    #[serde(skip_serializing_if = "Option::is_none")]
807    items_pending: Option<usize>,
808    /// Active parallel LLM worker count (1 = serial). Present only on the "scan" phase event.
809    #[serde(skip_serializing_if = "Option::is_none")]
810    llm_parallelism: Option<u32>,
811}
812
813/// GAP-SG-45: separates the SCAN metric (always serial — a single SQL sweep of
814/// the candidate set) from the DRAIN metric (the parallel worker fan-out). The
815/// legacy "scan" `PhaseEvent` reported `llm_parallelism` on the scan event,
816/// conflating the two; this event makes the distinction explicit.
817#[derive(Debug, Serialize)]
818struct ConcurrencyEvent {
819    phase: &'static str,
820    scan_parallelism: u32,
821    drain_parallelism: u32,
822}
823
824#[derive(Debug, Serialize)]
825struct ItemEvent<'a> {
826    /// Item identifier (memory name or entity name).
827    item: &'a str,
828    status: &'a str,
829    #[serde(skip_serializing_if = "Option::is_none")]
830    memory_id: Option<i64>,
831    #[serde(skip_serializing_if = "Option::is_none")]
832    entity_id: Option<i64>,
833    #[serde(skip_serializing_if = "Option::is_none")]
834    entities: Option<usize>,
835    #[serde(skip_serializing_if = "Option::is_none")]
836    rels: Option<usize>,
837    #[serde(skip_serializing_if = "Option::is_none")]
838    chars_before: Option<usize>,
839    #[serde(skip_serializing_if = "Option::is_none")]
840    chars_after: Option<usize>,
841    #[serde(skip_serializing_if = "Option::is_none")]
842    cost_usd: Option<f64>,
843    #[serde(skip_serializing_if = "Option::is_none")]
844    elapsed_ms: Option<u64>,
845    #[serde(skip_serializing_if = "Option::is_none")]
846    error: Option<String>,
847    index: usize,
848    total: usize,
849}
850
851#[derive(Debug, Serialize)]
852struct EnrichSummary {
853    summary: bool,
854    operation: String,
855    items_total: usize,
856    completed: usize,
857    failed: usize,
858    skipped: usize,
859    cost_usd: f64,
860    elapsed_ms: u64,
861    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that actually
862    /// ran the re-embedding during enrich. `"claude" | "codex" | "none"`.
863    /// Absent on the wire when `None` (kept for happy-path envelope cleanliness,
864    /// or when the operation did not involve a re-embed).
865    #[serde(skip_serializing_if = "Option::is_none")]
866    backend_invoked: Option<&'static str>,
867    /// GAP-SG-15: items still parked on backoff (`status='pending'` with a future
868    /// `next_retry_at`) when the run ended. Non-zero means the backlog has NOT
869    /// converged — those items are waiting on a cooldown, not done.
870    waiting: i64,
871    /// GAP-SG-15: dead-letter items (`status='dead'`) at the end of the run.
872    /// Non-zero requires `--list-dead` to inspect and `--requeue-dead` to retry.
873    dead: i64,
874}
875
876use crate::output::emit_json_line as emit_json;
877
878// ---------------------------------------------------------------------------
879// Queue DB
880// ---------------------------------------------------------------------------
881
882// Queue functions and structs moved to queue.rs
883
884// LLM call_claude and call_openrouter moved to extraction.rs
885
886// ---------------------------------------------------------------------------
887// Preflight probe (G35) — single-turn ping to verify the LLM provider
888// ---------------------------------------------------------------------------
889
890/// Result of a single preflight ping (G35).
891enum PreflightOutcome {
892    /// The provider accepted the ping without rate-limit or other errors.
893    Healthy,
894    /// The provider rejected the ping due to OAuth rate limit. The
895    /// `suggestion` field is a human hint that callers can embed in the
896    /// user-facing error.
897    RateLimited {
898        reason: String,
899        suggestion: &'static str,
900    },
901    /// Any other provider error (binary missing, auth failure, etc.).
902    Error(AppError),
903}
904
905/// Probes the configured LLM provider with a 1-turn ping.
906///
907/// - Claude: `claude -p "ping" --max-turns 1 --strict-mcp-config --mcp-config '{}'`
908/// - Codex:  `codex exec -c mcp_servers='{}' "ping" --json`
909///
910/// The probe intentionally avoids spawning any MCP server children (G28-A)
911/// to keep its own process footprint at the minimum.
912fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
913    let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
914
915    match args.mode() {
916        EnrichMode::ClaudeCode => {
917            let bin = match find_claude_binary(args.claude_binary.as_deref()) {
918                Ok(b) => b,
919                Err(e) => return PreflightOutcome::Error(e),
920            };
921            // v1.0.88 (BUG-3 fix, ADR-0046): write the empty MCP config to a
922            // tempfile (Claude Code 2.1.177 rejects the inline `{}`
923            // form) and run the preflight gate before spawn, mirroring
924            // the canonical pattern in `claude_runner::build_claude_command`.
925            let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
926                Ok(p) => p,
927                Err(e) => {
928                    return PreflightOutcome::Error(AppError::Io(e));
929                }
930            };
931            let mut cmd = std::process::Command::new(&bin);
932            crate::spawn::env_whitelist::apply_env_whitelist(
933                &mut cmd,
934                crate::spawn::env_whitelist::is_strict_env_clear(),
935            );
936            if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
937                return PreflightOutcome::Error(e);
938            }
939            cmd.arg("-p")
940                .arg("ping")
941                .arg("--max-turns")
942                .arg("1")
943                .arg("--strict-mcp-config")
944                .arg("--mcp-config")
945                .arg(mcp_config_path.as_os_str())
946                .arg("--dangerously-skip-permissions")
947                .arg("--settings")
948                .arg("{\"hooks\":{}}")
949                .arg("--output-format")
950                .arg("json")
951                .stdin(std::process::Stdio::null())
952                .stdout(std::process::Stdio::piped())
953                .stderr(std::process::Stdio::piped());
954
955            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
956                Ok(c) => c,
957                Err(e) => {
958                    return PreflightOutcome::Error(AppError::Io(e));
959                }
960            };
961            let output = match wait_with_timeout(child, timeout) {
962                Ok(out) => out,
963                Err(e) => return PreflightOutcome::Error(e),
964            };
965            if !output.status.success() {
966                let stderr = String::from_utf8_lossy(&output.stderr);
967                if stderr.contains("hit your session limit")
968                    || stderr.contains("rate_limit")
969                    || stderr.contains("429")
970                {
971                    return PreflightOutcome::RateLimited {
972                        reason: stderr.trim().to_string(),
973                        suggestion:
974                            "wait for the OAuth window to reset or use --fallback-mode codex",
975                    };
976                }
977                return PreflightOutcome::Error(AppError::Validation(format!(
978                    "preflight probe failed: {stderr}",
979                    stderr = stderr.trim()
980                )));
981            }
982            PreflightOutcome::Healthy
983        }
984        EnrichMode::Codex => {
985            let bin = match find_codex_binary(args.codex_binary.as_deref()) {
986                Ok(b) => b,
987                Err(e) => return PreflightOutcome::Error(e),
988            };
989            super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
990                .map_err(PreflightOutcome::Error)
991                .ok();
992            let schema = "{}";
993            let schema_path = match super::codex_spawn::trusted_schema_path() {
994                Ok(p) => p,
995                Err(e) => return PreflightOutcome::Error(e),
996            };
997            let spawn_args = super::codex_spawn::CodexSpawnArgs {
998                binary: &bin,
999                prompt: "ping",
1000                json_schema: schema,
1001                input_text: "",
1002                model: args.codex_model.as_deref(),
1003                timeout_secs: args.rate_limit_buffer.max(60),
1004                schema_path: schema_path.clone(),
1005            };
1006            let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
1007                Ok(c) => c,
1008                Err(e) => return PreflightOutcome::Error(e),
1009            };
1010            let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
1011                Ok(c) => c,
1012                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1013            };
1014            let output = match wait_with_timeout(child, timeout) {
1015                Ok(out) => out,
1016                Err(e) => return PreflightOutcome::Error(e),
1017            };
1018            let _ = std::fs::remove_file(&schema_path);
1019            if !output.status.success() {
1020                let stderr = String::from_utf8_lossy(&output.stderr);
1021                if stderr.contains("rate_limit")
1022                    || stderr.contains("429")
1023                    || stderr.contains("Too Many Requests")
1024                {
1025                    return PreflightOutcome::RateLimited {
1026                        reason: stderr.trim().to_string(),
1027                        suggestion: "wait for the rate-limit window to reset",
1028                    };
1029                }
1030                return PreflightOutcome::Error(AppError::Validation(format!(
1031                    "preflight probe failed: {stderr}",
1032                    stderr = stderr.trim()
1033                )));
1034            }
1035            PreflightOutcome::Healthy
1036        }
1037        EnrichMode::Opencode => {
1038            let bin = match super::opencode_runner::find_opencode_binary_with_override(
1039                args.opencode_binary.as_deref(),
1040            ) {
1041                Ok(b) => b,
1042                Err(e) => return PreflightOutcome::Error(e),
1043            };
1044            let model =
1045                super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1046            let mut cmd =
1047                match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1048                {
1049                    Ok(c) => c,
1050                    Err(e) => return PreflightOutcome::Error(e),
1051                };
1052            let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1053                Ok(c) => c,
1054                Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1055            };
1056            let output = match wait_with_timeout(child, timeout) {
1057                Ok(out) => out,
1058                Err(e) => return PreflightOutcome::Error(e),
1059            };
1060            if !output.status.success() {
1061                let stderr = String::from_utf8_lossy(&output.stderr);
1062                if stderr.contains("rate_limit")
1063                    || stderr.contains("429")
1064                    || stderr.contains("Too Many Requests")
1065                {
1066                    return PreflightOutcome::RateLimited {
1067                        reason: stderr.trim().to_string(),
1068                        suggestion: "wait for the rate-limit window to reset",
1069                    };
1070                }
1071                return PreflightOutcome::Error(AppError::Validation(format!(
1072                    "preflight probe failed: {stderr}",
1073                    stderr = stderr.trim()
1074                )));
1075            }
1076            PreflightOutcome::Healthy
1077        }
1078        EnrichMode::OpenRouter => {
1079            // v1.0.95: the OpenRouter JUDGE has no subprocess to ping; the
1080            // preflight only confirms a usable API key resolves. The chat
1081            // client singleton is initialised in run() before scan.
1082            match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1083                Some(_) => PreflightOutcome::Healthy,
1084                None => PreflightOutcome::Error(AppError::Validation(
1085                    "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1086                )),
1087            }
1088        }
1089    }
1090}
1091
1092/// Cross-platform wait with timeout (no extra crate dependency).
1093fn wait_with_timeout(
1094    mut child: std::process::Child,
1095    timeout: std::time::Duration,
1096) -> Result<std::process::Output, AppError> {
1097    use wait_timeout::ChildExt;
1098    let start = std::time::Instant::now();
1099    let Some(exit) = child.wait_timeout(timeout).map_err(AppError::Io)? else {
1100        let _ = child.kill();
1101        let _ = child.wait();
1102        return Err(AppError::Validation(format!(
1103            "preflight probe timed out after {}s",
1104            start.elapsed().as_secs()
1105        )));
1106    };
1107    let mut stdout = Vec::new();
1108    if let Some(mut out) = child.stdout.take() {
1109        std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1110    }
1111    let mut stderr = Vec::new();
1112    if let Some(mut err) = child.stderr.take() {
1113        std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1114    }
1115    Ok(std::process::Output {
1116        status: exit,
1117        stdout,
1118        stderr,
1119    })
1120}
1121
1122// Scan functions moved to scan.rs
1123
1124// Persist functions moved to postprocess.rs
1125
1126// ---------------------------------------------------------------------------
1127// Main entry point
1128// ---------------------------------------------------------------------------
1129
1130// ---------------------------------------------------------------------------
1131// G20: mode-conditional flag validation
1132// ---------------------------------------------------------------------------
1133
1134/// True when a scalar value matches its declared default. Used to
1135/// distinguish "operator passed an explicit override" from "clap filled
1136/// the default" for flags with default_value_t.
1137fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1138    value == default
1139}
1140
1141/// G20: validate that flags for one LLM provider were not passed when
1142/// the operator selected a different provider. Flags silently discarded
1143/// by the wrong mode are surfaced as AppError::Validation BEFORE any
1144/// DB work, so the operator gets an actionable error instead of a
1145/// surprise at runtime.
1146///
1147/// Detection rules:
1148/// - For Option<PathBuf> / Option<String>: is_some() means explicit
1149/// - For scalar fields with default_value_t: value != default means explicit
1150/// - For boolean fields: true means explicit (default is false)
1151///
1152/// Mode-specific matrices:
1153/// - mode=claude-code rejects: codex_binary, codex_model, codex_timeout != 300
1154/// - mode=codex rejects: claude_binary, claude_model, claude_timeout != 300, max_cost_usd
1155fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1156    const DEFAULT_TIMEOUT: u64 = 300;
1157
1158    let mut conflicts: Vec<String> = Vec::new();
1159
1160    match args.mode() {
1161        EnrichMode::ClaudeCode => {
1162            if args.codex_binary.is_some() {
1163                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1164            }
1165            if args.codex_model.is_some() {
1166                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1167            }
1168            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1169                conflicts.push(format!(
1170                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1171                    args.codex_timeout
1172                ));
1173            }
1174        }
1175        EnrichMode::Codex => {
1176            if args.claude_binary.is_some() {
1177                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1178            }
1179            if args.claude_model.is_some() {
1180                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1181            }
1182            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1183                conflicts.push(format!(
1184                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1185                    args.claude_timeout
1186                ));
1187            }
1188            if args.max_cost_usd.is_some() {
1189                conflicts.push(
1190                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1191                        .to_string(),
1192                );
1193            }
1194        }
1195        EnrichMode::Opencode => {
1196            if args.claude_binary.is_some() {
1197                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1198            }
1199            if args.claude_model.is_some() {
1200                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1201            }
1202            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1203                conflicts.push(format!(
1204                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1205                    args.claude_timeout
1206                ));
1207            }
1208            if args.max_cost_usd.is_some() {
1209                conflicts.push(
1210                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1211                        .to_string(),
1212                );
1213            }
1214        }
1215        EnrichMode::OpenRouter => {
1216            if args.claude_binary.is_some() {
1217                conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1218            }
1219            if args.claude_model.is_some() {
1220                conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1221            }
1222            if args.codex_binary.is_some() {
1223                conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1224            }
1225            if args.codex_model.is_some() {
1226                conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1227            }
1228            if args.opencode_binary.is_some() {
1229                conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1230            }
1231            if args.opencode_model.is_some() {
1232                conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1233            }
1234            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1235                conflicts.push(format!(
1236                    "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1237                    args.claude_timeout
1238                ));
1239            }
1240            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1241                conflicts.push(format!(
1242                    "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1243                    args.codex_timeout
1244                ));
1245            }
1246            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1247                conflicts.push(format!(
1248                    "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1249                    args.opencode_timeout
1250                ));
1251            }
1252        }
1253    }
1254
1255    if !conflicts.is_empty() {
1256        return Err(AppError::Validation(format!(
1257            "G20: mode-conditional flag conflicts detected for --mode={}:\n  - {}",
1258            args.mode(),
1259            conflicts.join("\n  - ")
1260        )));
1261    }
1262
1263    Ok(())
1264}
1265
1266// ---------------------------------------------------------------------------
1267
1268/// Main entry point for the `enrich` command.
1269pub fn run(
1270    args: &EnrichArgs,
1271    llm_backend: crate::cli::LlmBackendChoice,
1272    embedding_backend: crate::cli::EmbeddingBackendChoice,
1273) -> Result<(), AppError> {
1274    // G20: mode-conditional flag validation BEFORE any DB access.
1275    // Surfaces flags that the wrong mode would silently discard.
1276    validate_mode_conditional_flags_enrich(args)?;
1277
1278    // v1.1.1 (P2): --target only means something for re-embed. Fail loud
1279    // instead of silently ignoring it under another operation.
1280    if args.target != ReEmbedTarget::Memories
1281        && !matches!(args.operation(), EnrichOperation::ReEmbed)
1282    {
1283        let target_label = match args.target {
1284            ReEmbedTarget::Memories => "memories",
1285            ReEmbedTarget::Entities => "entities",
1286            ReEmbedTarget::Chunks => "chunks",
1287            ReEmbedTarget::All => "all",
1288        };
1289        return Err(AppError::Validation(format!(
1290            "--target {target_label} only applies to --operation re-embed"
1291        )));
1292    }
1293
1294    // GAP-ENRICH-BACKLOG-CONVERGE: --status is a read-only report. It never
1295    // calls the LLM, never initialises the OpenRouter client, and never
1296    // acquires the job singleton, so it is safe to run while a real enrich is
1297    // in flight (it only reads the queue DB and the unbound backlog).
1298    // GAP-SG-23/11: --list-dead (inspect dead-letter rows) and --requeue-dead
1299    // (resurrect them) are queue-only operations — no LLM, no main-DB write, no
1300    // singleton. Both are scoped to the current --operation so a shared queue is
1301    // not cross-contaminated. Handled before any provider setup.
1302    if args.list_dead || args.requeue_dead || args.prune_dead_orphans {
1303        let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1304        let op_label = format!("{:?}", args.operation());
1305        let paths = AppPaths::resolve(args.db.as_deref())?;
1306        let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1307        let queue_conn = open_queue_db(&queue_path)?;
1308        // GAP-SG-66: prune orphan dead rows (memory gone) — needs the main DB to
1309        // confirm the referenced memory is truly absent before deleting.
1310        if args.prune_dead_orphans {
1311            ensure_db_ready(&paths)?;
1312            let main_conn = open_rw(&paths.db)?;
1313            let pruned = prune_dead_orphans(&queue_conn, &main_conn, &op_label, &namespace)?;
1314            let dead_total: i64 = queue_conn
1315                .query_row(
1316                    "SELECT COUNT(*) FROM queue WHERE status='dead' \
1317                     AND (operation = ?1 OR operation IS NULL)",
1318                    rusqlite::params![op_label],
1319                    |r| r.get(0),
1320                )
1321                .unwrap_or(0);
1322            emit_json(&DeadSummary {
1323                summary: true,
1324                operation: op_label,
1325                namespace,
1326                action: "prune-dead-orphans",
1327                dead_total,
1328                requeued: 0,
1329                pruned,
1330            });
1331            return Ok(());
1332        }
1333        if args.list_dead {
1334            let mut stmt = queue_conn.prepare(
1335                "SELECT item_key, item_type, attempt, error_class, error, \
1336                         finish_reason, input_tokens, output_tokens FROM queue \
1337                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL) ORDER BY id",
1338            )?;
1339            let rows = stmt
1340                .query_map(rusqlite::params![op_label], |r| {
1341                    Ok(DeadItem {
1342                        dead_item: true,
1343                        item_key: r.get(0)?,
1344                        item_type: r.get(1)?,
1345                        attempt: r.get(2)?,
1346                        error_class: r.get(3)?,
1347                        error: r.get(4)?,
1348                        finish_reason: r.get(5)?,
1349                        input_tokens: r.get(6)?,
1350                        output_tokens: r.get(7)?,
1351                    })
1352                })?
1353                .collect::<Result<Vec<_>, _>>()?;
1354            let dead_total = rows.len() as i64;
1355            for item in &rows {
1356                emit_json(item);
1357            }
1358            emit_json(&DeadSummary {
1359                summary: true,
1360                operation: op_label,
1361                namespace,
1362                action: "list-dead",
1363                dead_total,
1364                requeued: 0,
1365                pruned: 0,
1366            });
1367            return Ok(());
1368        }
1369        // --requeue-dead: move dead -> pending, clearing the failure bookkeeping.
1370        let dead_total: i64 = queue_conn
1371            .query_row(
1372                "SELECT COUNT(*) FROM queue WHERE status='dead' \
1373                 AND (operation = ?1 OR operation IS NULL)",
1374                rusqlite::params![op_label],
1375                |r| r.get(0),
1376            )
1377            .unwrap_or(0);
1378        let requeued = queue_conn
1379            .execute(
1380                "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1381                 error=NULL, error_class=NULL \
1382                 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1383                rusqlite::params![op_label],
1384            )
1385            .map_err(|e| AppError::Validation(format!("requeue-dead failed: {e}")))?
1386            as i64;
1387        let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1388        emit_json(&DeadSummary {
1389            summary: true,
1390            operation: op_label,
1391            namespace,
1392            action: "requeue-dead",
1393            dead_total,
1394            requeued,
1395            pruned: 0,
1396        });
1397        return Ok(());
1398    }
1399
1400    if args.status {
1401        let paths = AppPaths::resolve(args.db.as_deref())?;
1402        ensure_db_ready(&paths)?;
1403        let conn = open_rw(&paths.db)?;
1404        let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1405        let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1406        // GAP-SG-77: DB-semantics backlog for the queried operation (fixes the
1407        // false pending=0 for entity-descriptions/body-enrich/re-embed).
1408        let scan_backlog =
1409            count_operation_backlog(&conn, &args.operation(), &namespace, args.target)?;
1410        let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1411        let queue_conn = open_queue_db(&queue_path)?;
1412        let op_label = format!("{:?}", args.operation());
1413        // GAP-SG-42: scope every count to the current operation. Rows migrated
1414        // before the `operation` column (NULL) are still counted so a legacy
1415        // queue is never reported as spuriously empty.
1416        let count_status = |st: &str, op: &str| -> i64 {
1417            queue_conn
1418                .query_row(
1419                    "SELECT COUNT(*) FROM queue WHERE status=?1 \
1420                     AND (operation = ?2 OR operation IS NULL)",
1421                    rusqlite::params![st, op],
1422                    |r| r.get(0),
1423                )
1424                .unwrap_or(0)
1425        };
1426        let eligible_now: i64 = queue_conn
1427            .query_row(
1428                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1429                 AND (operation = ?1 OR operation IS NULL) \
1430                 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1431                rusqlite::params![op_label],
1432                |r| r.get(0),
1433            )
1434            .unwrap_or(0);
1435        let waiting: i64 = queue_conn
1436            .query_row(
1437                "SELECT COUNT(*) FROM queue WHERE status='pending' \
1438                 AND (operation = ?1 OR operation IS NULL) \
1439                 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1440                rusqlite::params![op_label],
1441                |r| r.get(0),
1442            )
1443            .unwrap_or(0);
1444        // GAP-SG-16: enumerate the items currently in backoff with their ETA.
1445        let waiting_items = {
1446            let mut stmt = queue_conn.prepare(
1447                "SELECT item_key, attempt, next_retry_at, error_class FROM queue \
1448                 WHERE status='pending' AND (operation = ?1 OR operation IS NULL) \
1449                 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now') \
1450                 ORDER BY next_retry_at",
1451            )?;
1452            let items: Vec<WaitingItem> = stmt
1453                .query_map(rusqlite::params![op_label], |r| {
1454                    Ok(WaitingItem {
1455                        item_key: r.get(0)?,
1456                        attempt: r.get(1)?,
1457                        next_retry_at: r.get(2)?,
1458                        error_class: r.get(3)?,
1459                    })
1460                })?
1461                .collect::<Result<Vec<_>, _>>()?;
1462            items
1463        };
1464        let queue_pending = count_status("pending", &op_label);
1465        let queue_processing = count_status("processing", &op_label);
1466        let queue_done = count_status("done", &op_label);
1467        let queue_failed = count_status("failed", &op_label);
1468        let queue_skipped = count_status("skipped", &op_label);
1469        let queue_dead = count_status("dead", &op_label);
1470        // GAP-SG-15/46: distinguish empty from cooldown from not-yet-scanned.
1471        let state = if eligible_now > 0 {
1472            "draining"
1473        } else if waiting > 0 {
1474            "cooldown"
1475        } else if queue_pending == 0 && scan_backlog > 0 {
1476            "pending-scan"
1477        } else {
1478            "empty"
1479        };
1480        emit_json(&EnrichStatus {
1481            status_report: true,
1482            operation: op_label,
1483            namespace,
1484            unbound_backlog,
1485            scan_backlog,
1486            queue_pending,
1487            queue_processing,
1488            queue_done,
1489            queue_failed,
1490            queue_skipped,
1491            queue_dead,
1492            eligible_now,
1493            waiting,
1494            state,
1495            waiting_items,
1496        });
1497        return Ok(());
1498    }
1499
1500    // v1.0.95 (ADR-0054): when the JUDGE is OpenRouter the model is mandatory
1501    // (no default) and the API key must resolve BEFORE any network or DB work.
1502    // The chat client singleton is initialised here so every per-item dispatch
1503    // fetches it without re-threading the key.
1504    if args.mode() == EnrichMode::OpenRouter {
1505        let model = args.openrouter_model.as_deref().ok_or_else(|| {
1506            AppError::Validation(
1507                "--mode openrouter requires --openrouter-model (no default model is allowed)"
1508                    .into(),
1509            )
1510        })?;
1511        let resolved =
1512            crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1513                .ok_or_else(|| {
1514                    AppError::Validation(
1515                        "OPENROUTER_API_KEY not found; set the env var, store it via \
1516                         `config add-key --provider openrouter`, or pass --openrouter-api-key"
1517                            .into(),
1518                    )
1519                })?;
1520        crate::embedder::get_openrouter_chat_client(
1521            resolved.value,
1522            model,
1523            args.openrouter_timeout,
1524        )?;
1525    }
1526
1527    let started = Instant::now();
1528
1529    let paths = AppPaths::resolve(args.db.as_deref())?;
1530    ensure_db_ready(&paths)?;
1531    let conn = open_rw(&paths.db)?;
1532    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1533
1534    // G28-B (v1.0.68) + G30 (v1.0.69): enforce singleton per
1535    // (job_type, namespace, db_hash) so two parallel `enrich` invocations
1536    // on the same DB cannot co-exist, but concurrent enrich on different
1537    // databases works as expected. The force flag (--force) breaks a
1538    // stale lock from a previously crashed invocation.
1539    let wait_secs = args.wait_job_singleton;
1540    let force_flag = args.force_job_singleton;
1541    let _singleton = crate::lock::acquire_job_singleton(
1542        crate::lock::JobType::Enrich,
1543        &namespace,
1544        &paths.db,
1545        wait_secs,
1546        force_flag,
1547    )?;
1548
1549    // Validate provider binary upfront only for LLM-backed operations.
1550    let provider_binary = if matches!(args.operation(), EnrichOperation::ReEmbed) {
1551        None
1552    } else {
1553        Some(match args.mode() {
1554            EnrichMode::ClaudeCode => {
1555                let bin = find_claude_binary(args.claude_binary.as_deref())?;
1556                let version = super::claude_runner::validate_claude_version(&bin)?;
1557                tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1558                emit_json(&PhaseEvent {
1559                    phase: "validate",
1560                    binary_path: bin.to_str(),
1561                    version: Some(&version),
1562                    items_total: None,
1563                    items_pending: None,
1564                    llm_parallelism: None,
1565                });
1566                bin
1567            }
1568            EnrichMode::Codex => {
1569                let bin = find_codex_binary(args.codex_binary.as_deref())?;
1570                emit_json(&PhaseEvent {
1571                    phase: "validate",
1572                    binary_path: bin.to_str(),
1573                    version: None,
1574                    items_total: None,
1575                    items_pending: None,
1576                    llm_parallelism: None,
1577                });
1578                bin
1579            }
1580            EnrichMode::Opencode => {
1581                let bin = super::opencode_runner::find_opencode_binary_with_override(
1582                    args.opencode_binary.as_deref(),
1583                )?;
1584                emit_json(&PhaseEvent {
1585                    phase: "validate",
1586                    binary_path: bin.to_str(),
1587                    version: None,
1588                    items_total: None,
1589                    items_pending: None,
1590                    llm_parallelism: None,
1591                });
1592                bin
1593            }
1594            EnrichMode::OpenRouter => {
1595                // v1.0.95: the OpenRouter JUDGE is a REST call, not a spawned
1596                // binary. The chat client singleton was initialised at the top
1597                // of run(); this placeholder path threads through the dispatch
1598                // but is never dereferenced by the OpenRouter arm.
1599                emit_json(&PhaseEvent {
1600                    phase: "validate",
1601                    binary_path: None,
1602                    version: None,
1603                    items_total: None,
1604                    items_pending: None,
1605                    llm_parallelism: None,
1606                });
1607                PathBuf::new()
1608            }
1609        })
1610    };
1611
1612    // G28-D: refuse to start when the system is saturated. This check
1613    // is BEFORE preflight so we never spend an OAuth turn on a host
1614    // that is already at the limit.
1615    if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1616        let load = crate::system_load::load_average_one();
1617        let n = crate::system_load::ncpus();
1618        return Err(AppError::Validation(format!(
1619            "system load average {load:.2} exceeds 2x ncpus ({n}); \
1620             pass --no-max-load-check to override (not recommended)"
1621        )));
1622    }
1623
1624    // G35: preflight probe — issue a single ping turn to verify the
1625    // provider is healthy before scanning N candidates. If the probe
1626    // fails with a rate-limit error, optionally fall back to a
1627    // different mode (typically codex) instead of failing the entire
1628    // batch. The probe itself consumes 1 OAuth turn, so it stays
1629    // opt-in (default off) to keep --dry-run and CI flows zero-cost.
1630    if args.preflight_check
1631        && !args.dry_run
1632        && !matches!(args.operation(), EnrichOperation::ReEmbed)
1633    {
1634        let preflight_result = run_preflight_probe(args);
1635        match preflight_result {
1636            PreflightOutcome::Healthy => {
1637                tracing::info!(target: "enrich", mode = ?args.mode(), "preflight probe healthy");
1638            }
1639            PreflightOutcome::RateLimited { reason, suggestion } => {
1640                if let Some(fallback) = args.fallback_mode.clone() {
1641                    if fallback != args.mode() {
1642                        // G35 (v1.0.69): the mid-batch mode switch is
1643                        // intentionally NOT applied because it would
1644                        // desynchronise the per-item rate-limit wait
1645                        // state (rate-limited items in the worker are
1646                        // timed against the original provider). Instead
1647                        // we abort cleanly so the operator can re-invoke
1648                        // with `--mode {fallback:?}`. This guarantees no
1649                        // OAuth window is wasted and no partial state
1650                        // is left in the queue.
1651                        return Err(AppError::Validation(format!(
1652                            "preflight detected rate limit on {mode:?}: {reason}; \
1653                             re-invoke with `--mode {fallback:?}` to use the fallback provider",
1654                            mode = args.mode()
1655                        )));
1656                    }
1657                    return Err(AppError::Validation(format!(
1658                        "preflight detected rate limit on {mode:?}: {reason}; \
1659                         --fallback-mode matches --mode, no recovery possible",
1660                        mode = args.mode()
1661                    )));
1662                }
1663                return Err(AppError::Validation(format!(
1664                    "preflight detected rate limit on {mode:?}: {reason}; \
1665                     {suggestion}; pass --fallback-mode codex to recover",
1666                    mode = args.mode()
1667                )));
1668            }
1669            PreflightOutcome::Error(e) => {
1670                return Err(e);
1671            }
1672        }
1673    }
1674
1675    // SCAN phase
1676    let mut scan_result = scan_operation(&conn, &namespace, args)?;
1677    // GAP-SG-69: body-enrich candidates are scanned purely by `LENGTH(body) <
1678    // min_output_chars`, so a short body whose rewrite the preservation guard
1679    // keeps rejecting is re-scanned every pass — items_total never reaches 0 and
1680    // `--until-empty` never converges (the detached worker reported a stuck
1681    // backlog for 30+ min). Exclude memories already vetoed `status='skipped'`
1682    // for this operation in the sidecar queue; `cleanup_queue_entry`
1683    // (remember/edit/forget/purge) clears the veto when the body actually
1684    // changes, so a genuinely updated memory is reconsidered automatically.
1685    if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1686        let q_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1687        if let Ok(q) = open_queue_db(&q_path) {
1688            if let Ok(vetoed) = skipped_item_keys(&q, &format!("{:?}", args.operation())) {
1689                scan_result.retain(|k| !vetoed.contains(k));
1690            }
1691        }
1692    }
1693    let total = scan_result.len();
1694
1695    emit_json(&PhaseEvent {
1696        phase: "scan",
1697        binary_path: None,
1698        version: None,
1699        items_total: Some(total),
1700        items_pending: Some(total),
1701        llm_parallelism: Some(args.llm_parallelism),
1702    });
1703
1704    // Dry-run: emit preview events and summary without calling LLM
1705    if args.dry_run {
1706        for (idx, key) in scan_result.iter().enumerate() {
1707            emit_json(&ItemEvent {
1708                item: key,
1709                status: "preview",
1710                memory_id: None,
1711                entity_id: None,
1712                entities: None,
1713                rels: None,
1714                chars_before: None,
1715                chars_after: None,
1716                cost_usd: None,
1717                elapsed_ms: None,
1718                error: None,
1719                index: idx,
1720                total,
1721            });
1722        }
1723        emit_json(&EnrichSummary {
1724            summary: true,
1725            operation: format!("{:?}", args.operation()),
1726            items_total: total,
1727            completed: 0,
1728            failed: 0,
1729            skipped: 0,
1730            cost_usd: 0.0,
1731            elapsed_ms: started.elapsed().as_millis() as u64,
1732            backend_invoked: take_enrich_backend(),
1733            waiting: 0,
1734            dead: 0,
1735        });
1736        return Ok(());
1737    }
1738
1739    // All operations in this enum have an execution path.
1740
1741    // Queue setup for resume/retry (GAP-SG-64: sidecar alongside --db)
1742    let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1743    let queue_conn = open_queue_db(&queue_path)?;
1744
1745    if args.resume {
1746        let reset = queue_conn
1747            .execute(
1748                "UPDATE queue SET status='pending' WHERE status='processing'",
1749                [],
1750            )
1751            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1752        if reset > 0 {
1753            tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1754        }
1755    }
1756
1757    if args.retry_failed {
1758        let count = queue_conn
1759            .execute(
1760                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1761                [],
1762            )
1763            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1764        tracing::info!(target: "enrich", count, "retrying failed items");
1765    }
1766
1767    if !args.resume && !args.retry_failed && !args.until_empty {
1768        queue_conn
1769            .execute("DELETE FROM queue", [])
1770            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1771    }
1772
1773    // Populate queue (GAP-SG-12: tag rows with the operation + link memory_id).
1774    let op_label = format!("{:?}", args.operation());
1775    let item_type = item_type_for(&args.operation());
1776    for key in scan_result.iter() {
1777        // v1.1.1 (P2): re-embed keys may be prefixed (`entity:` / `chunk:`);
1778        // derive the row item_type from the key so prune-dead-orphans never
1779        // mistakes an entity/chunk row for an orphaned memory.
1780        let it = item_type_for_key(key, item_type);
1781        enqueue_candidate(&queue_conn, &conn, &namespace, key, it, &op_label);
1782    }
1783
1784    // G19: parallel LLM processing via std::thread::scope when parallelism > 1.
1785    // Clamp enforces the range even if the caller bypasses clap validation.
1786    let parallelism = if args.mode() == EnrichMode::OpenRouter {
1787        let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
1788        tracing::info!(
1789            target: "enrich",
1790            concurrency = rest,
1791            source = "rest_concurrency",
1792            "OpenRouter REST concurrency (clamp 1..=16)"
1793        );
1794        rest
1795    } else {
1796        let p = args.llm_parallelism.clamp(1, 32) as usize;
1797        tracing::info!(
1798            target: "enrich",
1799            concurrency = p,
1800            source = "llm_parallelism",
1801            "LLM subprocess parallelism (clamp 1..=32)"
1802        );
1803        p
1804    };
1805    if parallelism > 1 {
1806        tracing::info!(
1807            target: "enrich",
1808            llm_parallelism = parallelism,
1809            "parallel LLM processing with bounded thread pool"
1810        );
1811    }
1812    // G28-D (v1.0.68) + G34 (v1.0.69): warn above the recommended parallelism
1813    // ceiling. The threshold and message depend on the LLM mode because
1814    // Claude Code spawns MCP children (G28-A) while Codex does not.
1815    if parallelism > 4 {
1816        match args.mode() {
1817            EnrichMode::ClaudeCode => {
1818                tracing::warn!(
1819                    target: "enrich",
1820                    llm_parallelism = parallelism,
1821                    recommended_max = 4,
1822                    mode = "claude-code",
1823                    "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1824                     consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1825                     to cut MCP children (G28-A)"
1826                );
1827            }
1828            EnrichMode::Codex if parallelism > 16 => {
1829                tracing::warn!(
1830                    target: "enrich",
1831                    llm_parallelism = parallelism,
1832                    recommended_max = 16,
1833                    mode = "codex",
1834                    "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1835                     consider --llm-parallelism 8 for safer concurrency"
1836                );
1837            }
1838            EnrichMode::Codex => {
1839                // No warning: codex does not spawn MCP children and was
1840                // validated at parallelism 8 in production (1161 items,
1841                // 0 failures) per the 2026-06-04 session audit.
1842            }
1843            EnrichMode::Opencode if parallelism > 16 => {
1844                tracing::warn!(
1845                    target: "enrich",
1846                    llm_parallelism = parallelism,
1847                    recommended_max = 16,
1848                    mode = "opencode",
1849                    "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1850                     consider --llm-parallelism 8 for safer concurrency"
1851                );
1852            }
1853            EnrichMode::Opencode => {
1854                // No warning: opencode does not spawn MCP children.
1855            }
1856            EnrichMode::OpenRouter => {
1857                // No warning: OpenRouter is a bounded HTTP fan-out (no
1858                // subprocess); --llm-parallelism is respected as-is.
1859            }
1860        }
1861    }
1862
1863    let mut completed = 0usize;
1864    let mut failed = 0usize;
1865    let mut skipped = 0usize;
1866    let mut cost_total = 0.0f64;
1867    let mut oauth_detected = false;
1868    let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1869    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1870    let enrich_started = std::time::Instant::now();
1871
1872    let provider_timeout = match args.mode() {
1873        EnrichMode::ClaudeCode => args.claude_timeout,
1874        EnrichMode::Codex => args.codex_timeout,
1875        EnrichMode::Opencode => args.opencode_timeout,
1876        EnrichMode::OpenRouter => args.openrouter_timeout,
1877    };
1878
1879    let provider_model: Option<&str> = match args.mode() {
1880        EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1881        EnrichMode::Codex => args.codex_model.as_deref(),
1882        EnrichMode::Opencode => args.opencode_model.as_deref(),
1883        EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
1884    };
1885
1886    // GAP-SG-16: when --ignore-backoff is set, drop the per-item cooldown filter
1887    // from candidate selection so items parked on `next_retry_at` are eligible
1888    // immediately. Shared by the parallel workers and the serial loop.
1889    let backoff_clause: &str = if args.ignore_backoff {
1890        ""
1891    } else {
1892        "AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))"
1893    };
1894
1895    // GAP-SG-45: announce the scan-vs-drain concurrency split (scan is always
1896    // serial; drain uses `parallelism` workers).
1897    emit_json(&ConcurrencyEvent {
1898        phase: "concurrency",
1899        scan_parallelism: 1,
1900        drain_parallelism: parallelism as u32,
1901    });
1902
1903    // GAP-ENRICH-BACKLOG-CONVERGE: --until-empty wraps the scan→populate→drain
1904    // cycle in an internal loop so the external bash retry loop is unnecessary.
1905    // Without --until-empty the loop body runs exactly once (legacy behaviour).
1906    let until_deadline = std::time::Instant::now()
1907        + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
1908    loop {
1909        if args.until_empty {
1910            // Re-scan and re-enqueue eligible candidates each iteration.
1911            // INSERT OR IGNORE never resurrects a dead-letter row (item_key is
1912            // UNIQUE), so the backlog converges instead of looping forever.
1913            let mut rescan = scan_operation(&conn, &namespace, args)?;
1914            // GAP-SG-69: drop memories already vetoed `status='skipped'` so the
1915            // re-scan converges instead of re-enqueuing a non-expandable short
1916            // body every iteration (body-enrich only; the verdict persists in
1917            // the sidecar queue and is cleared by cleanup_queue_entry on edit).
1918            if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1919                if let Ok(vetoed) = skipped_item_keys(&queue_conn, &op_label) {
1920                    rescan.retain(|k| !vetoed.contains(k));
1921                }
1922            }
1923            for key in &rescan {
1924                let it = item_type_for_key(key, item_type);
1925                enqueue_candidate(&queue_conn, &conn, &namespace, key, it, &op_label);
1926            }
1927        }
1928        let completed_before = completed;
1929
1930        // G19: when parallelism > 1, spawn bounded worker threads.
1931        // Each worker opens its own DB connections (WAL supports concurrent readers + serialized writers).
1932        // The queue DB claim is atomic via UPDATE...RETURNING — no external lock needed.
1933        if parallelism > 1 {
1934            let stdout_mu = parking_lot::Mutex::new(());
1935            let budget = args.max_cost_usd;
1936            let operation = args.operation().clone();
1937            let mode = args.mode().clone();
1938            let min_oc = args.min_output_chars;
1939            let max_oc = args.max_output_chars;
1940            let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1941
1942            struct WorkerResult {
1943                completed: usize,
1944                failed: usize,
1945                skipped: usize,
1946                cost: f64,
1947                oauth: bool,
1948                // GAP-SG-76 fix: distinct signal for "worker aborted because
1949                // SQLITE_BUSY exhausted all bounded retries" so the caller
1950                // fails loud (exit 15) instead of silently treating it like
1951                // an exhausted/empty backlog.
1952                db_busy: bool,
1953            }
1954
1955            let results: Vec<WorkerResult> = std::thread::scope(|s| {
1956                let handles: Vec<_> = (0..parallelism)
1957                .map(|worker_id| {
1958                    let stdout_mu = &stdout_mu;
1959                    let paths = &paths;
1960                    let queue_path = &queue_path;
1961                    let namespace = &namespace;
1962                    let provider_binary = provider_binary.as_deref();
1963                    let operation = &operation;
1964                    let mode = &mode;
1965                    let prompt_tpl = prompt_tpl.as_deref();
1966                    s.spawn(move || {
1967                        let w_conn = match open_rw(&paths.db) {
1968                            Ok(c) => c,
1969                            Err(e) => {
1970                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1971                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1972                            }
1973                        };
1974                        let w_queue = match open_queue_db(queue_path) {
1975                            Ok(c) => c,
1976                            Err(e) => {
1977                                tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1978                                return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1979                            }
1980                        };
1981                        let mut w_completed = 0usize;
1982                        let mut w_failed = 0usize;
1983                        let mut w_skipped = 0usize;
1984                        let mut w_cost = 0.0f64;
1985                        let mut w_oauth = false;
1986                        let mut w_db_busy = false;
1987                        let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1988                        let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1989                        // G28-D: per-worker circuit breaker that aborts the
1990                        // loop after `circuit_breaker_threshold` consecutive
1991                        // HardFailure outcomes (transient/rate-limited errors
1992                        // do NOT count, so a recovering provider is not
1993                        // penalised).
1994                        let mut w_breaker = crate::retry::CircuitBreaker::new(
1995                            args.circuit_breaker_threshold.max(1),
1996                            std::time::Duration::from_secs(60),
1997                        );
1998
1999                        loop {
2000                            if crate::shutdown_requested() {
2001                                tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2002                                break;
2003                            }
2004                            if let Some(b) = budget {
2005                                if !w_oauth && w_cost >= b {
2006                                    break;
2007                                }
2008                            }
2009                            // GAP-SG-16: --ignore-backoff drops the next_retry_at
2010                            // cooldown filter so items waiting on backoff are
2011                            // claimed immediately.
2012                            // GAP-SG-76: distinguish a genuinely empty backlog
2013                            // (QueryReturnedNoRows) from SQLITE_BUSY lock
2014                            // contention with the main writer or another
2015                            // worker — a busy claim retries briefly instead of
2016                            // breaking the drain loop early.
2017                            // GAP-SG-76/v1.1.00 fix: bounded busy-retry via the
2018                            // shared with_busy_retry helper (5 attempts, exponential
2019                            // half-jitter backoff, kill-switch aware) instead of an
2020                            // unbounded `loop { ... continue; }` on SQLITE_BUSY. When
2021                            // retries are exhausted, with_busy_retry converts to
2022                            // AppError::DbBusy — that is NOT treated as an empty
2023                            // backlog. It sets w_db_busy and stops this worker; the
2024                            // caller (after collecting all workers) fails loud with
2025                            // exit code 15 instead of silently under-reporting a
2026                            // convergent drain.
2027                            let pending = match crate::storage::utils::with_busy_retry(|| {
2028                                dequeue_next_pending(&w_queue, backoff_clause)
2029                            }) {
2030                                Ok(DequeueOutcome::Claimed(p)) => Some(p),
2031                                Ok(DequeueOutcome::Empty) => None,
2032                                Err(AppError::DbBusy(msg)) => {
2033                                    tracing::error!(target: "enrich", worker = worker_id, error = %msg, "SQLITE_BUSY exhausted bounded retries, worker aborting");
2034                                    w_db_busy = true;
2035                                    None
2036                                }
2037                                Err(e) => {
2038                                    tracing::error!(target: "enrich", worker = worker_id, error = %e, "dequeue failed");
2039                                    None
2040                                }
2041                            };
2042                            let (queue_id, item_key, _item_type, attempt_current) = match pending {
2043                                Some(p) => p,
2044                                None => break,
2045                            };
2046                            let item_started = Instant::now();
2047                            let current_index = w_completed + w_failed + w_skipped;
2048
2049                            // provider_binary validated upfront (Some for every
2050                            // LLM-backed op; None only for ReEmbed, which ignores
2051                            // it). unwrap_or_default yields "" solely in that
2052                            // unread case, so a broken invariant surfaces as a
2053                            // recoverable per-item error instead of a panic.
2054                            let provider_bin = provider_binary.unwrap_or_else(|| std::path::Path::new(""));
2055                            let call_result = match operation {
2056                                EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2057                                EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2058                                EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2059                                EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2060                                EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2061                                EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2062                                EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2063                                EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2064                                EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2065                                EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2066                                EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2067                                EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2068                                EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, args.body_extract_graph_only),
2069                            };
2070                            // GAP-SG-72/73: drain UNCONDITIONALLY right after
2071                            // every call_result (success or failure) so a
2072                            // diagnostic never survives past the item that
2073                            // produced it — see the doc comment on
2074                            // OpenRouterFailureDiagnostics.
2075                            let openrouter_diag = take_last_openrouter_failure();
2076
2077                            match call_result {
2078                                Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2079                                    if is_oauth { w_oauth = true; }
2080                                    w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2081                                    let _ = w_queue.execute(
2082                                        "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2083                                        rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2084                                    );
2085                                    w_completed += 1;
2086                                    if !is_oauth { w_cost += cost; }
2087                                    // G28-D: count success; resets breaker.
2088                                    let _ = w_breaker
2089                                        .record(crate::retry::AttemptOutcome::Success);
2090                                    let _guard = stdout_mu.lock();
2091                                    emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2092                                }
2093                                Ok(EnrichItemResult::Skipped { reason }) => {
2094                                    w_skipped += 1;
2095                                    let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2096                                    let _guard = stdout_mu.lock();
2097                                    emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2098                                }
2099                                Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2100                                    // G29 Passo 4: worker mirror of the
2101                                    // serial path. Counted as a soft
2102                                    // skip so the queue surface shows
2103                                    // a quality issue rather than a
2104                                    // transport failure.
2105                                    w_skipped += 1;
2106                                    let reason = format!(
2107                                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2108                                    );
2109                                    let _ = w_queue.execute(
2110                                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2111                                        rusqlite::params![reason, queue_id],
2112                                    );
2113                                    let _guard = stdout_mu.lock();
2114                                    emit_json(&ItemEvent {
2115                                        item: &item_key,
2116                                        status: "preservation_failed",
2117                                        memory_id: None,
2118                                        entity_id: None,
2119                                        entities: None,
2120                                        rels: None,
2121                                        chars_before: Some(chars_before),
2122                                        chars_after: Some(chars_after),
2123                                        cost_usd: None,
2124                                        elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2125                                        error: Some(reason),
2126                                        index: current_index,
2127                                        total,
2128                                    });
2129                                }
2130                                Err(e) => {
2131                                    let err_str = format!("{e}");
2132                                    if matches!(e, AppError::RateLimited { .. }) {
2133                                        if crate::retry::is_kill_switch_active() {
2134                                            tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2135                                        } else if std::time::Instant::now() >= w_deadline {
2136                                            tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2137                                        } else {
2138                                            let half = w_backoff / 2;
2139                                            let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2140                                            let actual_wait = half + jitter;
2141                                            tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2142                                            let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2143                                            std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2144                                            w_backoff = (w_backoff * 2).min(900);
2145                                            continue;
2146                                        }
2147                                    }
2148                                    w_failed += 1;
2149                                    // GAP-SG-73: prefer the origin-typed verdict
2150                                    // (ChatError::retry_class, computed at the
2151                                    // exact HTTP status / provider code in
2152                                    // chat_api.rs) over the untyped fallback
2153                                    // classifier whenever this item's failure
2154                                    // came from an OpenRouter chat call.
2155                                    let outcome = match openrouter_diag {
2156                                        Some(diag) => record_item_failure_typed(
2157                                            &w_queue,
2158                                            queue_id,
2159                                            attempt_current,
2160                                            args.max_attempts,
2161                                            diag.retry_class,
2162                                            &err_str,
2163                                            diag.finish_reason.as_deref(),
2164                                            diag.prompt_tokens,
2165                                            diag.completion_tokens,
2166                                        ),
2167                                        None => record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e),
2168                                    };
2169                                    let _guard = stdout_mu.lock();
2170                                    emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2171                                    // G28-D: feed the classified outcome to the breaker (transient
2172                                    // failures do not count toward opening it).
2173                                    let breaker_opened = w_breaker.record(outcome);
2174                                    if breaker_opened {
2175                                        tracing::error!(target: "enrich",
2176                                            consecutive_failures = w_breaker.consecutive_failures(),
2177                                            "circuit breaker opened — aborting worker"
2178                                        );
2179                                        break;
2180                                    }
2181                                }
2182                            }
2183                        }
2184                        WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth, db_busy: w_db_busy }
2185                    })
2186                })
2187                .collect();
2188                handles
2189                    .into_iter()
2190                    .map(|h| {
2191                        h.join().unwrap_or(WorkerResult {
2192                            completed: 0,
2193                            failed: 0,
2194                            skipped: 0,
2195                            cost: 0.0,
2196                            oauth: false,
2197                            db_busy: false,
2198                        })
2199                    })
2200                    .collect()
2201            });
2202
2203            // GAP-SG-76 fix: a worker that aborted due to exhausted
2204            // SQLITE_BUSY retries must fail the whole `enrich` invocation
2205            // loudly (exit code 15 via AppError::DbBusy) rather than being
2206            // folded silently into the completed/failed/skipped counters,
2207            // which would understate a genuinely unfinished drain.
2208            if results.iter().any(|r| r.db_busy) {
2209                return Err(AppError::DbBusy(
2210                    "SQLITE_BUSY exhausted bounded retries while dequeuing (parallel worker)"
2211                        .into(),
2212                ));
2213            }
2214
2215            for r in &results {
2216                completed += r.completed;
2217                failed += r.failed;
2218                skipped += r.skipped;
2219                cost_total += r.cost;
2220                if r.oauth && !oauth_detected {
2221                    oauth_detected = true;
2222                }
2223            }
2224        } else {
2225            // Serial path (parallelism == 1) — original loop
2226            loop {
2227                if crate::shutdown_requested() {
2228                    tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2229                    break;
2230                }
2231
2232                // Budget check
2233                if let Some(budget) = args.max_cost_usd {
2234                    if !oauth_detected && cost_total >= budget {
2235                        tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2236                        break;
2237                    }
2238                }
2239
2240                // Dequeue next pending item (GAP-SG-16: --ignore-backoff drops
2241                // the next_retry_at cooldown filter).
2242                // GAP-SG-76: distinguish a genuinely empty backlog
2243                // (QueryReturnedNoRows) from SQLITE_BUSY lock contention with
2244                // a concurrent writer — a busy claim retries briefly instead
2245                // of breaking the drain loop early.
2246                // GAP-SG-76/v1.1.00 fix: bounded busy-retry via the shared
2247                // with_busy_retry helper (5 attempts, exponential half-jitter
2248                // backoff, kill-switch aware) instead of an unbounded
2249                // `loop { ... continue; }` on SQLITE_BUSY. When retries are
2250                // exhausted, with_busy_retry converts to AppError::DbBusy,
2251                // which we propagate immediately (fail loud, exit code 15)
2252                // instead of silently treating sustained contention as an
2253                // empty backlog — that would end the drain early and under-
2254                // report queue_pending as converged.
2255                let pending = match crate::storage::utils::with_busy_retry(|| {
2256                    dequeue_next_pending(&queue_conn, backoff_clause)
2257                }) {
2258                    Ok(DequeueOutcome::Claimed(p)) => Some(p),
2259                    Ok(DequeueOutcome::Empty) => None,
2260                    Err(e @ AppError::DbBusy(_)) => {
2261                        tracing::error!(target: "enrich", error = %e, "SQLITE_BUSY exhausted bounded retries, aborting drain loop");
2262                        return Err(e);
2263                    }
2264                    Err(e) => {
2265                        tracing::error!(target: "enrich", error = %e, "dequeue failed");
2266                        None
2267                    }
2268                };
2269
2270                let (queue_id, item_key, item_type, attempt_current) = match pending {
2271                    Some(p) => p,
2272                    None => break,
2273                };
2274
2275                let item_started = Instant::now();
2276                let current_index = completed + failed + skipped;
2277
2278                // See worker note: provider_binary is Some for every LLM-backed
2279                // op; "" here only for ReEmbed, which never reads it.
2280                let provider_bin = provider_binary
2281                    .as_deref()
2282                    .unwrap_or_else(|| std::path::Path::new(""));
2283                let call_result = match args.operation() {
2284                    EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => {
2285                        call_memory_bindings(
2286                            &conn,
2287                            &namespace,
2288                            &item_key,
2289                            provider_bin,
2290                            provider_model,
2291                            provider_timeout,
2292                            &args.mode(),
2293                        )
2294                    }
2295                    EnrichOperation::EntityDescriptions => call_entity_description(
2296                        &conn,
2297                        &namespace,
2298                        &item_key,
2299                        provider_bin,
2300                        provider_model,
2301                        provider_timeout,
2302                        &args.mode(),
2303                    ),
2304                    EnrichOperation::BodyEnrich => call_body_enrich(
2305                        &conn,
2306                        &namespace,
2307                        &item_key,
2308                        provider_bin,
2309                        provider_model,
2310                        provider_timeout,
2311                        &args.mode(),
2312                        args.min_output_chars,
2313                        args.max_output_chars,
2314                        args.prompt_template.as_deref(),
2315                        args.preserve_threshold,
2316                        &paths,
2317                        llm_backend,
2318                        embedding_backend,
2319                    ),
2320                    EnrichOperation::ReEmbed => call_reembed(
2321                        &conn,
2322                        &namespace,
2323                        &item_key,
2324                        &paths,
2325                        llm_backend,
2326                        embedding_backend,
2327                    ),
2328                    EnrichOperation::WeightCalibrate => call_weight_calibrate(
2329                        &conn,
2330                        &namespace,
2331                        &item_key,
2332                        provider_bin,
2333                        provider_model,
2334                        provider_timeout,
2335                        &args.mode(),
2336                    ),
2337                    EnrichOperation::RelationReclassify => call_relation_reclassify(
2338                        &conn,
2339                        &namespace,
2340                        &item_key,
2341                        provider_bin,
2342                        provider_model,
2343                        provider_timeout,
2344                        &args.mode(),
2345                    ),
2346                    EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2347                        call_entity_connect(
2348                            &conn,
2349                            &namespace,
2350                            &item_key,
2351                            provider_bin,
2352                            provider_model,
2353                            provider_timeout,
2354                            &args.mode(),
2355                        )
2356                    }
2357                    EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2358                        &conn,
2359                        &namespace,
2360                        &item_key,
2361                        provider_bin,
2362                        provider_model,
2363                        provider_timeout,
2364                        &args.mode(),
2365                    ),
2366                    EnrichOperation::DescriptionEnrich => call_description_enrich(
2367                        &conn,
2368                        &namespace,
2369                        &item_key,
2370                        provider_bin,
2371                        provider_model,
2372                        provider_timeout,
2373                        &args.mode(),
2374                    ),
2375                    EnrichOperation::DomainClassify => call_domain_classify(
2376                        &conn,
2377                        &namespace,
2378                        &item_key,
2379                        provider_bin,
2380                        provider_model,
2381                        provider_timeout,
2382                        &args.mode(),
2383                    ),
2384                    EnrichOperation::GraphAudit => call_graph_audit(
2385                        &conn,
2386                        &namespace,
2387                        &item_key,
2388                        provider_bin,
2389                        provider_model,
2390                        provider_timeout,
2391                        &args.mode(),
2392                    ),
2393                    EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2394                        &conn,
2395                        &namespace,
2396                        &item_key,
2397                        provider_bin,
2398                        provider_model,
2399                        provider_timeout,
2400                        &args.mode(),
2401                    ),
2402                    EnrichOperation::BodyExtract => call_body_extract(
2403                        &conn,
2404                        &namespace,
2405                        &item_key,
2406                        provider_bin,
2407                        provider_model,
2408                        provider_timeout,
2409                        &args.mode(),
2410                        args.body_extract_graph_only,
2411                    ),
2412                };
2413                // GAP-SG-72/73: drain UNCONDITIONALLY right after every
2414                // call_result (mirrors the worker loop above).
2415                let openrouter_diag = take_last_openrouter_failure();
2416
2417                match call_result {
2418                    Ok(EnrichItemResult::Done {
2419                        memory_id,
2420                        entity_id,
2421                        entities,
2422                        rels,
2423                        chars_before,
2424                        chars_after,
2425                        cost,
2426                        is_oauth,
2427                    }) => {
2428                        if is_oauth && !oauth_detected {
2429                            oauth_detected = true;
2430                            tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2431                        }
2432                        backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2433
2434                        // Persist depends on the operation
2435                        let persist_err: Option<String> = match args.operation() {
2436                            EnrichOperation::MemoryBindings => {
2437                                // Bindings already persisted inside call_memory_bindings
2438                                None
2439                            }
2440                            EnrichOperation::EntityDescriptions => {
2441                                // Description already persisted inside call_entity_description
2442                                None
2443                            }
2444                            EnrichOperation::BodyEnrich => {
2445                                // Body already persisted inside call_body_enrich
2446                                None
2447                            }
2448                            _ => {
2449                                // All G27 operations persist inside their call_* function
2450                                None
2451                            }
2452                        };
2453
2454                        if let Err(e) = queue_conn.execute(
2455                    "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2456                    rusqlite::params![
2457                        memory_id,
2458                        entity_id,
2459                        entities as i64,
2460                        rels as i64,
2461                        cost,
2462                        item_started.elapsed().as_millis() as i64,
2463                        queue_id
2464                    ],
2465                ) {
2466                        tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2467                    }
2468
2469                        if persist_err.is_none() {
2470                            completed += 1;
2471                            if !is_oauth {
2472                                cost_total += cost;
2473                            }
2474                            emit_json(&ItemEvent {
2475                                item: &item_key,
2476                                status: "done",
2477                                memory_id,
2478                                entity_id,
2479                                entities: Some(entities),
2480                                rels: Some(rels),
2481                                chars_before,
2482                                chars_after,
2483                                cost_usd: if is_oauth { None } else { Some(cost) },
2484                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2485                                error: None,
2486                                index: current_index,
2487                                total,
2488                            });
2489                        } else {
2490                            failed += 1;
2491                            emit_json(&ItemEvent {
2492                                item: &item_key,
2493                                status: "failed",
2494                                memory_id: None,
2495                                entity_id: None,
2496                                entities: None,
2497                                rels: None,
2498                                chars_before: None,
2499                                chars_after: None,
2500                                cost_usd: None,
2501                                elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2502                                error: persist_err,
2503                                index: current_index,
2504                                total,
2505                            });
2506                        }
2507                    }
2508                    Ok(EnrichItemResult::Skipped { reason }) => {
2509                        skipped += 1;
2510                        if let Err(e) = queue_conn.execute(
2511                    "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2512                    rusqlite::params![reason, queue_id],
2513                ) {
2514                        tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2515                    }
2516                        emit_json(&ItemEvent {
2517                            item: &item_key,
2518                            status: "skipped",
2519                            memory_id: None,
2520                            entity_id: None,
2521                            entities: None,
2522                            rels: None,
2523                            chars_before: None,
2524                            chars_after: None,
2525                            cost_usd: None,
2526                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2527                            error: None,
2528                            index: current_index,
2529                            total,
2530                        });
2531                    }
2532                    Ok(EnrichItemResult::PreservationFailed {
2533                        score,
2534                        threshold,
2535                        chars_before,
2536                        chars_after,
2537                    }) => {
2538                        // G29 Passo 4: the LLM rewrite diverged too far from
2539                        // the original body. Count as a soft failure (not
2540                        // `failed`) so the queue surfaces it as a quality
2541                        // issue, not a transport error. The reason is
2542                        // structured so the operator can audit why a body
2543                        // was rejected.
2544                        skipped += 1;
2545                        let reason = format!(
2546                        "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2547                    );
2548                        if let Err(qe) = queue_conn.execute(
2549                        "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2550                        rusqlite::params![reason, queue_id],
2551                    ) {
2552                        tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2553                    }
2554                        emit_json(&ItemEvent {
2555                            item: &item_key,
2556                            status: "preservation_failed",
2557                            memory_id: None,
2558                            entity_id: None,
2559                            entities: None,
2560                            rels: None,
2561                            chars_before: Some(chars_before),
2562                            chars_after: Some(chars_after),
2563                            cost_usd: None,
2564                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2565                            error: Some(reason),
2566                            index: current_index,
2567                            total,
2568                        });
2569                    }
2570                    Err(e) => {
2571                        let err_str = format!("{e}");
2572                        if matches!(e, AppError::RateLimited { .. }) {
2573                            if crate::retry::is_kill_switch_active() {
2574                                tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2575                            } else if std::time::Instant::now() >= rate_limit_deadline {
2576                                tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2577                            } else {
2578                                let half = backoff_secs / 2;
2579                                let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2580                                let actual_wait = half + jitter;
2581                                tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2582                                if let Err(qe) = queue_conn.execute(
2583                                    "UPDATE queue SET status='pending' WHERE id=?1",
2584                                    rusqlite::params![queue_id],
2585                                ) {
2586                                    tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2587                                }
2588                                std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2589                                backoff_secs = (backoff_secs * 2).min(900);
2590                                continue;
2591                            }
2592                        }
2593
2594                        failed += 1;
2595                        // GAP-SG-73: prefer the origin-typed verdict
2596                        // (ChatError::retry_class) over the untyped fallback
2597                        // classifier whenever this item's failure came from
2598                        // an OpenRouter chat call.
2599                        let _outcome = match openrouter_diag {
2600                            Some(diag) => record_item_failure_typed(
2601                                &queue_conn,
2602                                queue_id,
2603                                attempt_current,
2604                                args.max_attempts,
2605                                diag.retry_class,
2606                                &err_str,
2607                                diag.finish_reason.as_deref(),
2608                                diag.prompt_tokens,
2609                                diag.completion_tokens,
2610                            ),
2611                            None => record_item_failure(
2612                                &queue_conn,
2613                                queue_id,
2614                                attempt_current,
2615                                args.max_attempts,
2616                                &e,
2617                            ),
2618                        };
2619                        emit_json(&ItemEvent {
2620                            item: &item_key,
2621                            status: "failed",
2622                            memory_id: None,
2623                            entity_id: None,
2624                            entities: None,
2625                            rels: None,
2626                            chars_before: None,
2627                            chars_after: None,
2628                            cost_usd: None,
2629                            elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2630                            error: Some(err_str),
2631                            index: current_index,
2632                            total,
2633                        });
2634                    }
2635                }
2636
2637                let _ = item_type; // used via queue schema only
2638            }
2639        } // end else (serial path)
2640
2641        if !args.until_empty {
2642            break;
2643        }
2644        let eligible_remaining: i64 = queue_conn
2645            .query_row(
2646                &format!("SELECT COUNT(*) FROM queue WHERE status='pending' {backoff_clause}"),
2647                [],
2648                |r| r.get(0),
2649            )
2650            .unwrap_or(0);
2651        let progressed = completed > completed_before;
2652        if std::time::Instant::now() >= until_deadline {
2653            tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
2654            break;
2655        }
2656        if !progressed && eligible_remaining == 0 {
2657            tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
2658            break;
2659        }
2660        if eligible_remaining == 0 {
2661            // Remaining pending items are waiting on backoff; nap and re-check.
2662            std::thread::sleep(std::time::Duration::from_secs(1));
2663        }
2664    } // end until-empty loop
2665
2666    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2667    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2668
2669    // GAP-SG-15: report items still in cooldown (waiting) and dead-lettered
2670    // alongside completed, so `--until-empty` makes the convergence state
2671    // explicit (cooldown vs. dead vs. truly empty) instead of just "done".
2672    let waiting_final: i64 = queue_conn
2673        .query_row(
2674            "SELECT COUNT(*) FROM queue WHERE status='pending' \
2675             AND (operation = ?1 OR operation IS NULL) \
2676             AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
2677            rusqlite::params![op_label],
2678            |r| r.get(0),
2679        )
2680        .unwrap_or(0);
2681    let dead_final: i64 = queue_conn
2682        .query_row(
2683            "SELECT COUNT(*) FROM queue WHERE status='dead' \
2684             AND (operation = ?1 OR operation IS NULL)",
2685            rusqlite::params![op_label],
2686            |r| r.get(0),
2687        )
2688        .unwrap_or(0);
2689
2690    emit_json(&EnrichSummary {
2691        summary: true,
2692        operation: format!("{:?}", args.operation()),
2693        items_total: total,
2694        completed,
2695        failed,
2696        skipped,
2697        cost_usd: cost_total,
2698        elapsed_ms: started.elapsed().as_millis() as u64,
2699        backend_invoked: take_enrich_backend(),
2700        waiting: waiting_final,
2701        dead: dead_final,
2702    });
2703
2704    if failed == 0 {
2705        // GAP-ENRICH-BACKLOG-CONVERGE: keep the queue file when dead-letter rows
2706        // exist so `enrich --status` can still report them on the next run.
2707        let dead: i64 = queue_conn
2708            .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
2709                r.get(0)
2710            })
2711            .unwrap_or(0);
2712        // GAP-SG-69: keep the sidecar queue while it still holds `skipped`
2713        // verdicts. Those rows tell the next scan which short bodies are
2714        // non-expandable; removing the file would lose the veto and the
2715        // body-enrich backlog would never converge. cleanup_queue_entry clears
2716        // a row when its memory is edited/forgotten, so the veto is not permanent.
2717        let skipped_remaining: i64 = queue_conn
2718            .query_row(
2719                "SELECT COUNT(*) FROM queue WHERE status='skipped'",
2720                [],
2721                |r| r.get(0),
2722            )
2723            .unwrap_or(0);
2724        if dead == 0 && skipped_remaining == 0 {
2725            let _ = std::fs::remove_file(&queue_path);
2726        }
2727    }
2728
2729    Ok(())
2730}
2731
2732// EnrichItemResult + call_* functions moved to extraction.rs
2733
2734// ---------------------------------------------------------------------------
2735// Tests
2736// ---------------------------------------------------------------------------
2737
2738#[cfg(test)]
2739mod tests {
2740    use super::*;
2741
2742    #[test]
2743    fn bindings_schema_is_valid_json() {
2744        let _: serde_json::Value =
2745            serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2746    }
2747
2748    #[test]
2749    fn entity_description_schema_is_valid_json() {
2750        let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2751            .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2752    }
2753
2754    #[test]
2755    fn body_enrich_schema_is_valid_json() {
2756        let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2757            .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2758    }
2759}