1mod extraction;
26mod postprocess;
27mod queue;
28mod scan;
29use extraction::{
30 call_body_enrich, call_body_extract, call_deep_research_synth, call_description_enrich,
31 call_domain_classify, call_entity_connect, call_entity_description, call_entity_type_validate,
32 call_graph_audit, call_memory_bindings, call_reembed, call_relation_reclassify,
33 call_weight_calibrate, find_codex_binary, take_last_openrouter_failure, EnrichItemResult,
34};
35use postprocess::{
36 persist_enriched_body, persist_entity_description, persist_memory_bindings,
37 reembed_memory_vector, take_enrich_backend,
38};
39pub use queue::{cleanup_queue_entry, DeadItem, DeadSummary, EnrichStatus, WaitingItem};
40use queue::{
41 dequeue_next_pending, enqueue_candidate, item_type_for, item_type_for_key, open_queue_db,
42 prune_dead_orphans, record_item_failure, record_item_failure_typed, skipped_item_keys,
43 DequeueOutcome,
44};
45use scan::{
46 count_operation_backlog, scan_isolated_entity_pairs, scan_operation, scan_unbound_memories,
47};
48
49use crate::commands::ingest_claude::find_claude_binary;
50use crate::constants::MAX_MEMORY_BODY_LEN;
51use crate::entity_type::EntityType;
52use crate::errors::AppError;
53use crate::paths::AppPaths;
54use crate::storage::connection::{ensure_db_ready, open_rw};
55use crate::storage::entities::{self, NewEntity, NewRelationship};
56use crate::storage::memories;
57
58use rusqlite::Connection;
59use serde::{Deserialize, Serialize};
60use std::io::Write;
61use std::path::{Path, PathBuf};
62use std::time::Instant;
63
64const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
69const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
70const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
71
72const BINDINGS_SCHEMA: &str = r#"{
77 "type": "object",
78 "properties": {
79 "entities": {
80 "type": "array",
81 "items": {
82 "type": "object",
83 "properties": {
84 "name": { "type": "string" },
85 "entity_type": {
86 "type": "string",
87 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
88 }
89 },
90 "required": ["name", "entity_type"],
91 "additionalProperties": false
92 }
93 },
94 "relationships": {
95 "type": "array",
96 "items": {
97 "type": "object",
98 "properties": {
99 "source": { "type": "string" },
100 "target": { "type": "string" },
101 "relation": {
102 "type": "string",
103 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
104 },
105 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
106 },
107 "required": ["source","target","relation","strength"],
108 "additionalProperties": false
109 }
110 }
111 },
112 "required": ["entities","relationships"],
113 "additionalProperties": false
114}"#;
115
116const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
117 "type": "object",
118 "properties": {
119 "description": { "type": "string" }
120 },
121 "required": ["description"],
122 "additionalProperties": false
123}"#;
124
125const BODY_ENRICH_SCHEMA: &str = r#"{
126 "type": "object",
127 "properties": {
128 "enriched_body": { "type": "string" }
129 },
130 "required": ["enriched_body"],
131 "additionalProperties": false
132}"#;
133
134const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
136Scale:\n\
137- 0.9 = vital hard dependency (A cannot function without B)\n\
138- 0.7 = important design relationship (A strongly supports/enables B)\n\
139- 0.5 = useful contextual link (A and B share relevant context)\n\
140- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
141Respond with the calibrated weight and brief reasoning.";
142
143const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
144 "type": "object",
145 "properties": {
146 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
147 "reasoning": { "type": "string" }
148 },
149 "required": ["calibrated_weight", "reasoning"],
150 "additionalProperties": false
151}"#;
152
153const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
155Valid canonical relations (pick exactly one):\n\
156- depends-on: A cannot function without B\n\
157- uses: A utilizes B but could substitute it\n\
158- supports: A reinforces or enables B\n\
159- causes: A triggers or produces B\n\
160- fixes: A resolves a problem in B\n\
161- contradicts: A conflicts with or invalidates B\n\
162- applies-to: A is relevant to or scoped within B\n\
163- follows: A comes after B in sequence\n\
164- replaces: A substitutes B\n\
165- tracked-in: A is monitored in B\n\
166- related: A and B share context (use sparingly)\n\n\
167Respond with the correct relation, strength, and reasoning.";
168
169const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
170 "type": "object",
171 "properties": {
172 "relation": { "type": "string" },
173 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
174 "reasoning": { "type": "string" }
175 },
176 "required": ["relation", "strength", "reasoning"],
177 "additionalProperties": false
178}"#;
179
180const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
182Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
183If NO meaningful relationship exists, set relation to \"none\".\n\
184Respond with the relation (or \"none\"), strength, and reasoning.";
185
186const ENTITY_CONNECT_SCHEMA: &str = r#"{
187 "type": "object",
188 "properties": {
189 "relation": { "type": "string" },
190 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
191 "reasoning": { "type": "string" }
192 },
193 "required": ["relation", "strength", "reasoning"],
194 "additionalProperties": false
195}"#;
196
197const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
199Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
200If the current type is correct, keep it. If wrong, suggest the correct type.\n\
201Respond with the validated type and reasoning.";
202
203const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
204 "type": "object",
205 "properties": {
206 "validated_type": { "type": "string" },
207 "was_correct": { "type": "boolean" },
208 "reasoning": { "type": "string" }
209 },
210 "required": ["validated_type", "was_correct", "reasoning"],
211 "additionalProperties": false
212}"#;
213
214const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
216BAD: 'ingested from docs/auth.md'\n\
217GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
218Respond with the improved description and reasoning.";
219
220const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
221 "type": "object",
222 "properties": {
223 "description": { "type": "string" },
224 "reasoning": { "type": "string" }
225 },
226 "required": ["description", "reasoning"],
227 "additionalProperties": false
228}"#;
229
230const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
232Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
233
234const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
235 "type": "object",
236 "properties": {
237 "domain": { "type": "string" },
238 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
239 "reasoning": { "type": "string" }
240 },
241 "required": ["domain", "confidence", "reasoning"],
242 "additionalProperties": false
243}"#;
244
245const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
247Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
248Respond with a list of issues found (or empty if none) and an overall quality score.";
249
250const GRAPH_AUDIT_SCHEMA: &str = r#"{
251 "type": "object",
252 "properties": {
253 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
254 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
255 "reasoning": { "type": "string" }
256 },
257 "required": ["quality_score", "issues", "reasoning"],
258 "additionalProperties": false
259}"#;
260
261const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
263Entity names: lowercase kebab-case, domain-specific.\n\
264Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
265Respond with extracted entities, relationships, and a synthesis summary.";
266
267const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
268 "type": "object",
269 "properties": {
270 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
271 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
272 "summary": { "type": "string" }
273 },
274 "required": ["entities", "relationships", "summary"],
275 "additionalProperties": false
276}"#;
277
278const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
280Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
281Respond with the restructured body and a brief summary of changes.";
282
283const BODY_EXTRACT_SCHEMA: &str = r#"{
284 "type": "object",
285 "properties": {
286 "restructured_body": { "type": "string" },
287 "changes_summary": { "type": "string" }
288 },
289 "required": ["restructured_body", "changes_summary"],
290 "additionalProperties": false
291}"#;
292
293const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2981. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2992. Typed relationships between entities with strength scores\n\n\
300Rules:\n\
301- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
302- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
303- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
304- NEVER use 'mentions' as relationship type\n\
305- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
306- Prefer fewer high-quality entities over many low-quality ones";
307
308const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
309
310const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
311
312#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
318#[serde(rename_all = "kebab-case")]
319pub enum EnrichOperation {
320 MemoryBindings,
325 AugmentBindings,
330 EntityDescriptions,
332 BodyEnrich,
334 ReEmbed,
336 WeightCalibrate,
338 RelationReclassify,
340 EntityConnect,
342 EntityTypeValidate,
344 DescriptionEnrich,
346 CrossDomainBridges,
348 DomainClassify,
350 GraphAudit,
352 DeepResearchSynth,
354 BodyExtract,
356}
357
358#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
365#[serde(rename_all = "kebab-case")]
366pub enum ReEmbedTarget {
367 Memories,
369 Entities,
371 Chunks,
373 All,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
379pub enum EnrichMode {
380 ClaudeCode,
382 Codex,
384 #[value(name = "opencode")]
386 Opencode,
387 #[value(name = "openrouter")]
389 OpenRouter,
390}
391
392impl std::fmt::Display for EnrichMode {
393 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394 match self {
395 EnrichMode::ClaudeCode => write!(f, "claude-code"),
396 EnrichMode::Codex => write!(f, "codex"),
397 EnrichMode::Opencode => write!(f, "opencode"),
398 EnrichMode::OpenRouter => write!(f, "openrouter"),
399 }
400 }
401}
402
403#[derive(clap::Args)]
405#[command(
406 about = "Enrich graph memories and entities using an LLM provider",
407 after_long_help = "EXAMPLES:\n \
408 # Add missing entity bindings to all unbound memories\n \
409 sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n \
410 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
411 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
412 # Expand short memory bodies (GAP-18)\n \
413 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
414 # Rebuild only missing memory embeddings without rewriting bodies\n \
415 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
416 # Resume an interrupted body-enrich run\n \
417 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
418 # Retry only failed items from a previous run\n \
419 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n \
420 # Converge the whole backlog (internal scan+drain loop, no bash wrapper)\n \
421 sqlite-graphrag enrich --operation memory-bindings --mode openrouter \\\n \
422 --openrouter-model deepseek/deepseek-v4-flash:nitro --until-empty --max-runtime 600\n\n \
423 # Inspect / resurrect dead-letter items\n \
424 sqlite-graphrag enrich --operation memory-bindings --list-dead\n \
425 sqlite-graphrag enrich --operation memory-bindings --requeue-dead\n\n \
426 # Read-only status (no LLM, no singleton)\n \
427 sqlite-graphrag enrich --operation memory-bindings --status\n\n\
428 OPERATIONS NOTE:\n \
429 memory-bindings LINKS each memory to the EXISTING entities extracted from its\n \
430 body — it does not invent a new graph, it connects what is missing. It scans\n \
431 only UNBOUND memories. To re-run extraction over ALREADY-bound memories and\n \
432 MERGE newly-found entities/relationships additively (without removing links),\n \
433 use --operation augment-bindings with --names/--names-file.\n\n\
434 DEAD-LETTER SIDECAR (.enrich-queue.sqlite):\n \
435 A SQLite sidecar tracks each work item across runs. Schema (table `queue`):\n \
436 item_key (UNIQUE name/id), item_type (memory|entity), operation, memory_id,\n \
437 status (pending|processing|done|skipped|dead), attempt, error, error_class,\n \
438 next_retry_at (backoff cooldown). --until-empty loops scan→drain internally\n \
439 until eligible items are exhausted; transient failures (incl. malformed/non-\n \
440 JSON LLM output, GAP-SG-09) reschedule with backoff until --max-attempts, then\n \
441 land in status='dead'. Use --status to see the queue, --list-dead to inspect\n \
442 the sink, --requeue-dead to retry it, and --ignore-backoff to skip cooldowns.\n \
443 --names/--names-file also remedy a cooldown by targeting a specific subset.\n\n\
444 EXIT CODES:\n \
445 0 success\n \
446 1 validation error (bad args, binary not found)\n \
447 14 I/O error"
448)]
449pub struct EnrichArgs {
450 #[arg(
455 long,
456 short = 'o',
457 value_enum,
458 value_name = "OPERATION",
459 required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
460 )]
461 pub operation: Option<EnrichOperation>,
462
463 #[arg(
467 long,
468 value_enum,
469 required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
470 )]
471 pub mode: Option<EnrichMode>,
472
473 #[arg(long, value_name = "N")]
475 pub limit: Option<usize>,
476
477 #[arg(long, value_enum, value_name = "TARGET", default_value_t = ReEmbedTarget::Memories)]
485 pub target: ReEmbedTarget,
486
487 #[arg(long)]
489 pub dry_run: bool,
490
491 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
493 pub namespace: Option<String>,
494
495 #[arg(long, value_name = "PATH")]
498 pub claude_binary: Option<PathBuf>,
499
500 #[arg(long, value_name = "MODEL")]
502 pub claude_model: Option<String>,
503
504 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
506 pub claude_timeout: u64,
507
508 #[arg(long, value_name = "PATH")]
511 pub codex_binary: Option<PathBuf>,
512
513 #[arg(long, value_name = "MODEL")]
515 pub codex_model: Option<String>,
516
517 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
519 pub codex_timeout: u64,
520
521 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
524 pub opencode_binary: Option<PathBuf>,
525
526 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
528 pub opencode_model: Option<String>,
529
530 #[arg(
532 long,
533 value_name = "SECONDS",
534 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
535 default_value_t = 300
536 )]
537 pub opencode_timeout: u64,
538
539 #[arg(long, value_name = "MODEL")]
542 pub openrouter_model: Option<String>,
543
544 #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
546 pub openrouter_api_key: Option<String>,
547
548 #[arg(long, value_name = "SECONDS", default_value_t = 600)]
555 pub openrouter_timeout: u64,
556
557 #[arg(long, value_name = "URL")]
559 pub openrouter_base_url: Option<String>,
560
561 #[arg(long, value_name = "USD")]
564 pub max_cost_usd: Option<f64>,
565
566 #[arg(long)]
569 pub resume: bool,
570
571 #[arg(long)]
573 pub retry_failed: bool,
574
575 #[arg(long)]
579 pub until_empty: bool,
580
581 #[arg(long, value_name = "SECONDS")]
584 pub max_runtime: Option<u64>,
585
586 #[arg(long, value_name = "N", default_value_t = 8, value_parser = clap::value_parser!(u32).range(1..=20))]
598 pub max_attempts: u32,
599
600 #[arg(long)]
603 pub status: bool,
604
605 #[arg(long)]
610 pub list_dead: bool,
611
612 #[arg(long)]
619 pub requeue_dead: bool,
620
621 #[arg(long)]
629 pub prune_dead_orphans: bool,
630
631 #[arg(long)]
637 pub ignore_backoff: bool,
638
639 #[arg(long)]
646 pub body_extract_graph_only: bool,
647
648 #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
651 pub rest_concurrency: Option<u32>,
652
653 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
656 pub min_output_chars: usize,
657
658 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
660 pub max_output_chars: usize,
661
662 #[arg(long, default_value_t = true)]
664 pub preserve_check: bool,
665
666 #[arg(long, value_name = "PATH")]
668 pub prompt_template: Option<PathBuf>,
669
670 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
674 pub llm_parallelism: u32,
675
676 #[arg(long)]
679 pub json: bool,
680
681 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
683 pub db: Option<String>,
684
685 #[arg(long, value_name = "SECONDS")]
688 pub wait_job_singleton: Option<u64>,
689
690 #[arg(long, default_value_t = false)]
694 pub force_job_singleton: bool,
695
696 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
706 pub names: Vec<String>,
707
708 #[arg(long, value_name = "PATH")]
712 pub names_file: Option<PathBuf>,
713
714 #[arg(long, default_value_t = false)]
718 pub preflight_check: bool,
719
720 #[arg(long, value_enum)]
724 pub fallback_mode: Option<EnrichMode>,
725
726 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
729 pub rate_limit_buffer: u64,
730
731 #[arg(long, default_value_t = true)]
735 pub max_load_check: bool,
736
737 #[arg(long, value_name = "N", default_value_t = 5)]
740 pub circuit_breaker_threshold: u32,
741
742 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
749 pub preserve_threshold: f64,
750
751 #[arg(long, default_value_t = true)]
756 pub codex_model_validate: bool,
757
758 #[arg(long, value_name = "MODEL")]
763 pub codex_model_fallback: Option<String>,
764}
765
766impl EnrichArgs {
767 fn operation(&self) -> EnrichOperation {
775 self.operation
776 .clone()
777 .unwrap_or(EnrichOperation::MemoryBindings)
778 }
779
780 fn mode(&self) -> EnrichMode {
785 self.mode.clone().unwrap_or(EnrichMode::OpenRouter)
786 }
787}
788
789#[derive(Debug, Serialize)]
798struct PhaseEvent<'a> {
799 phase: &'a str,
800 #[serde(skip_serializing_if = "Option::is_none")]
801 binary_path: Option<&'a str>,
802 #[serde(skip_serializing_if = "Option::is_none")]
803 version: Option<&'a str>,
804 #[serde(skip_serializing_if = "Option::is_none")]
805 items_total: Option<usize>,
806 #[serde(skip_serializing_if = "Option::is_none")]
807 items_pending: Option<usize>,
808 #[serde(skip_serializing_if = "Option::is_none")]
810 llm_parallelism: Option<u32>,
811}
812
813#[derive(Debug, Serialize)]
818struct ConcurrencyEvent {
819 phase: &'static str,
820 scan_parallelism: u32,
821 drain_parallelism: u32,
822}
823
824#[derive(Debug, Serialize)]
825struct ItemEvent<'a> {
826 item: &'a str,
828 status: &'a str,
829 #[serde(skip_serializing_if = "Option::is_none")]
830 memory_id: Option<i64>,
831 #[serde(skip_serializing_if = "Option::is_none")]
832 entity_id: Option<i64>,
833 #[serde(skip_serializing_if = "Option::is_none")]
834 entities: Option<usize>,
835 #[serde(skip_serializing_if = "Option::is_none")]
836 rels: Option<usize>,
837 #[serde(skip_serializing_if = "Option::is_none")]
838 chars_before: Option<usize>,
839 #[serde(skip_serializing_if = "Option::is_none")]
840 chars_after: Option<usize>,
841 #[serde(skip_serializing_if = "Option::is_none")]
842 cost_usd: Option<f64>,
843 #[serde(skip_serializing_if = "Option::is_none")]
844 elapsed_ms: Option<u64>,
845 #[serde(skip_serializing_if = "Option::is_none")]
846 error: Option<String>,
847 index: usize,
848 total: usize,
849}
850
851#[derive(Debug, Serialize)]
852struct EnrichSummary {
853 summary: bool,
854 operation: String,
855 items_total: usize,
856 completed: usize,
857 failed: usize,
858 skipped: usize,
859 cost_usd: f64,
860 elapsed_ms: u64,
861 #[serde(skip_serializing_if = "Option::is_none")]
866 backend_invoked: Option<&'static str>,
867 waiting: i64,
871 dead: i64,
874}
875
876use crate::output::emit_json_line as emit_json;
877
878enum PreflightOutcome {
892 Healthy,
894 RateLimited {
898 reason: String,
899 suggestion: &'static str,
900 },
901 Error(AppError),
903}
904
905fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
913 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
914
915 match args.mode() {
916 EnrichMode::ClaudeCode => {
917 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
918 Ok(b) => b,
919 Err(e) => return PreflightOutcome::Error(e),
920 };
921 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
926 Ok(p) => p,
927 Err(e) => {
928 return PreflightOutcome::Error(AppError::Io(e));
929 }
930 };
931 let mut cmd = std::process::Command::new(&bin);
932 crate::spawn::env_whitelist::apply_env_whitelist(
933 &mut cmd,
934 crate::spawn::env_whitelist::is_strict_env_clear(),
935 );
936 if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
937 return PreflightOutcome::Error(e);
938 }
939 cmd.arg("-p")
940 .arg("ping")
941 .arg("--max-turns")
942 .arg("1")
943 .arg("--strict-mcp-config")
944 .arg("--mcp-config")
945 .arg(mcp_config_path.as_os_str())
946 .arg("--dangerously-skip-permissions")
947 .arg("--settings")
948 .arg("{\"hooks\":{}}")
949 .arg("--output-format")
950 .arg("json")
951 .stdin(std::process::Stdio::null())
952 .stdout(std::process::Stdio::piped())
953 .stderr(std::process::Stdio::piped());
954
955 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
956 Ok(c) => c,
957 Err(e) => {
958 return PreflightOutcome::Error(AppError::Io(e));
959 }
960 };
961 let output = match wait_with_timeout(child, timeout) {
962 Ok(out) => out,
963 Err(e) => return PreflightOutcome::Error(e),
964 };
965 if !output.status.success() {
966 let stderr = String::from_utf8_lossy(&output.stderr);
967 if stderr.contains("hit your session limit")
968 || stderr.contains("rate_limit")
969 || stderr.contains("429")
970 {
971 return PreflightOutcome::RateLimited {
972 reason: stderr.trim().to_string(),
973 suggestion:
974 "wait for the OAuth window to reset or use --fallback-mode codex",
975 };
976 }
977 return PreflightOutcome::Error(AppError::Validation(format!(
978 "preflight probe failed: {stderr}",
979 stderr = stderr.trim()
980 )));
981 }
982 PreflightOutcome::Healthy
983 }
984 EnrichMode::Codex => {
985 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
986 Ok(b) => b,
987 Err(e) => return PreflightOutcome::Error(e),
988 };
989 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
990 .map_err(PreflightOutcome::Error)
991 .ok();
992 let schema = "{}";
993 let schema_path = match super::codex_spawn::trusted_schema_path() {
994 Ok(p) => p,
995 Err(e) => return PreflightOutcome::Error(e),
996 };
997 let spawn_args = super::codex_spawn::CodexSpawnArgs {
998 binary: &bin,
999 prompt: "ping",
1000 json_schema: schema,
1001 input_text: "",
1002 model: args.codex_model.as_deref(),
1003 timeout_secs: args.rate_limit_buffer.max(60),
1004 schema_path: schema_path.clone(),
1005 };
1006 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
1007 Ok(c) => c,
1008 Err(e) => return PreflightOutcome::Error(e),
1009 };
1010 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
1011 Ok(c) => c,
1012 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1013 };
1014 let output = match wait_with_timeout(child, timeout) {
1015 Ok(out) => out,
1016 Err(e) => return PreflightOutcome::Error(e),
1017 };
1018 let _ = std::fs::remove_file(&schema_path);
1019 if !output.status.success() {
1020 let stderr = String::from_utf8_lossy(&output.stderr);
1021 if stderr.contains("rate_limit")
1022 || stderr.contains("429")
1023 || stderr.contains("Too Many Requests")
1024 {
1025 return PreflightOutcome::RateLimited {
1026 reason: stderr.trim().to_string(),
1027 suggestion: "wait for the rate-limit window to reset",
1028 };
1029 }
1030 return PreflightOutcome::Error(AppError::Validation(format!(
1031 "preflight probe failed: {stderr}",
1032 stderr = stderr.trim()
1033 )));
1034 }
1035 PreflightOutcome::Healthy
1036 }
1037 EnrichMode::Opencode => {
1038 let bin = match super::opencode_runner::find_opencode_binary_with_override(
1039 args.opencode_binary.as_deref(),
1040 ) {
1041 Ok(b) => b,
1042 Err(e) => return PreflightOutcome::Error(e),
1043 };
1044 let model =
1045 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1046 let mut cmd =
1047 match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1048 {
1049 Ok(c) => c,
1050 Err(e) => return PreflightOutcome::Error(e),
1051 };
1052 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1053 Ok(c) => c,
1054 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1055 };
1056 let output = match wait_with_timeout(child, timeout) {
1057 Ok(out) => out,
1058 Err(e) => return PreflightOutcome::Error(e),
1059 };
1060 if !output.status.success() {
1061 let stderr = String::from_utf8_lossy(&output.stderr);
1062 if stderr.contains("rate_limit")
1063 || stderr.contains("429")
1064 || stderr.contains("Too Many Requests")
1065 {
1066 return PreflightOutcome::RateLimited {
1067 reason: stderr.trim().to_string(),
1068 suggestion: "wait for the rate-limit window to reset",
1069 };
1070 }
1071 return PreflightOutcome::Error(AppError::Validation(format!(
1072 "preflight probe failed: {stderr}",
1073 stderr = stderr.trim()
1074 )));
1075 }
1076 PreflightOutcome::Healthy
1077 }
1078 EnrichMode::OpenRouter => {
1079 match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1083 Some(_) => PreflightOutcome::Healthy,
1084 None => PreflightOutcome::Error(AppError::Validation(
1085 "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1086 )),
1087 }
1088 }
1089 }
1090}
1091
1092fn wait_with_timeout(
1094 mut child: std::process::Child,
1095 timeout: std::time::Duration,
1096) -> Result<std::process::Output, AppError> {
1097 use wait_timeout::ChildExt;
1098 let start = std::time::Instant::now();
1099 let Some(exit) = child.wait_timeout(timeout).map_err(AppError::Io)? else {
1100 let _ = child.kill();
1101 let _ = child.wait();
1102 return Err(AppError::Validation(format!(
1103 "preflight probe timed out after {}s",
1104 start.elapsed().as_secs()
1105 )));
1106 };
1107 let mut stdout = Vec::new();
1108 if let Some(mut out) = child.stdout.take() {
1109 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1110 }
1111 let mut stderr = Vec::new();
1112 if let Some(mut err) = child.stderr.take() {
1113 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1114 }
1115 Ok(std::process::Output {
1116 status: exit,
1117 stdout,
1118 stderr,
1119 })
1120}
1121
1122fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1138 value == default
1139}
1140
1141fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1156 const DEFAULT_TIMEOUT: u64 = 300;
1157
1158 let mut conflicts: Vec<String> = Vec::new();
1159
1160 match args.mode() {
1161 EnrichMode::ClaudeCode => {
1162 if args.codex_binary.is_some() {
1163 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1164 }
1165 if args.codex_model.is_some() {
1166 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1167 }
1168 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1169 conflicts.push(format!(
1170 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1171 args.codex_timeout
1172 ));
1173 }
1174 }
1175 EnrichMode::Codex => {
1176 if args.claude_binary.is_some() {
1177 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1178 }
1179 if args.claude_model.is_some() {
1180 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1181 }
1182 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1183 conflicts.push(format!(
1184 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1185 args.claude_timeout
1186 ));
1187 }
1188 if args.max_cost_usd.is_some() {
1189 conflicts.push(
1190 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1191 .to_string(),
1192 );
1193 }
1194 }
1195 EnrichMode::Opencode => {
1196 if args.claude_binary.is_some() {
1197 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1198 }
1199 if args.claude_model.is_some() {
1200 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1201 }
1202 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1203 conflicts.push(format!(
1204 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1205 args.claude_timeout
1206 ));
1207 }
1208 if args.max_cost_usd.is_some() {
1209 conflicts.push(
1210 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1211 .to_string(),
1212 );
1213 }
1214 }
1215 EnrichMode::OpenRouter => {
1216 if args.claude_binary.is_some() {
1217 conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1218 }
1219 if args.claude_model.is_some() {
1220 conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1221 }
1222 if args.codex_binary.is_some() {
1223 conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1224 }
1225 if args.codex_model.is_some() {
1226 conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1227 }
1228 if args.opencode_binary.is_some() {
1229 conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1230 }
1231 if args.opencode_model.is_some() {
1232 conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1233 }
1234 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1235 conflicts.push(format!(
1236 "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1237 args.claude_timeout
1238 ));
1239 }
1240 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1241 conflicts.push(format!(
1242 "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1243 args.codex_timeout
1244 ));
1245 }
1246 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1247 conflicts.push(format!(
1248 "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1249 args.opencode_timeout
1250 ));
1251 }
1252 }
1253 }
1254
1255 if !conflicts.is_empty() {
1256 return Err(AppError::Validation(format!(
1257 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1258 args.mode(),
1259 conflicts.join("\n - ")
1260 )));
1261 }
1262
1263 Ok(())
1264}
1265
1266pub fn run(
1270 args: &EnrichArgs,
1271 llm_backend: crate::cli::LlmBackendChoice,
1272 embedding_backend: crate::cli::EmbeddingBackendChoice,
1273) -> Result<(), AppError> {
1274 validate_mode_conditional_flags_enrich(args)?;
1277
1278 if args.target != ReEmbedTarget::Memories
1281 && !matches!(args.operation(), EnrichOperation::ReEmbed)
1282 {
1283 let target_label = match args.target {
1284 ReEmbedTarget::Memories => "memories",
1285 ReEmbedTarget::Entities => "entities",
1286 ReEmbedTarget::Chunks => "chunks",
1287 ReEmbedTarget::All => "all",
1288 };
1289 return Err(AppError::Validation(format!(
1290 "--target {target_label} only applies to --operation re-embed"
1291 )));
1292 }
1293
1294 if args.list_dead || args.requeue_dead || args.prune_dead_orphans {
1303 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1304 let op_label = format!("{:?}", args.operation());
1305 let paths = AppPaths::resolve(args.db.as_deref())?;
1306 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1307 let queue_conn = open_queue_db(&queue_path)?;
1308 if args.prune_dead_orphans {
1311 ensure_db_ready(&paths)?;
1312 let main_conn = open_rw(&paths.db)?;
1313 let pruned = prune_dead_orphans(&queue_conn, &main_conn, &op_label, &namespace)?;
1314 let dead_total: i64 = queue_conn
1315 .query_row(
1316 "SELECT COUNT(*) FROM queue WHERE status='dead' \
1317 AND (operation = ?1 OR operation IS NULL)",
1318 rusqlite::params![op_label],
1319 |r| r.get(0),
1320 )
1321 .unwrap_or(0);
1322 emit_json(&DeadSummary {
1323 summary: true,
1324 operation: op_label,
1325 namespace,
1326 action: "prune-dead-orphans",
1327 dead_total,
1328 requeued: 0,
1329 pruned,
1330 });
1331 return Ok(());
1332 }
1333 if args.list_dead {
1334 let mut stmt = queue_conn.prepare(
1335 "SELECT item_key, item_type, attempt, error_class, error, \
1336 finish_reason, input_tokens, output_tokens FROM queue \
1337 WHERE status='dead' AND (operation = ?1 OR operation IS NULL) ORDER BY id",
1338 )?;
1339 let rows = stmt
1340 .query_map(rusqlite::params![op_label], |r| {
1341 Ok(DeadItem {
1342 dead_item: true,
1343 item_key: r.get(0)?,
1344 item_type: r.get(1)?,
1345 attempt: r.get(2)?,
1346 error_class: r.get(3)?,
1347 error: r.get(4)?,
1348 finish_reason: r.get(5)?,
1349 input_tokens: r.get(6)?,
1350 output_tokens: r.get(7)?,
1351 })
1352 })?
1353 .collect::<Result<Vec<_>, _>>()?;
1354 let dead_total = rows.len() as i64;
1355 for item in &rows {
1356 emit_json(item);
1357 }
1358 emit_json(&DeadSummary {
1359 summary: true,
1360 operation: op_label,
1361 namespace,
1362 action: "list-dead",
1363 dead_total,
1364 requeued: 0,
1365 pruned: 0,
1366 });
1367 return Ok(());
1368 }
1369 let dead_total: i64 = queue_conn
1371 .query_row(
1372 "SELECT COUNT(*) FROM queue WHERE status='dead' \
1373 AND (operation = ?1 OR operation IS NULL)",
1374 rusqlite::params![op_label],
1375 |r| r.get(0),
1376 )
1377 .unwrap_or(0);
1378 let requeued = queue_conn
1379 .execute(
1380 "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1381 error=NULL, error_class=NULL \
1382 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1383 rusqlite::params![op_label],
1384 )
1385 .map_err(|e| AppError::Validation(format!("requeue-dead failed: {e}")))?
1386 as i64;
1387 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1388 emit_json(&DeadSummary {
1389 summary: true,
1390 operation: op_label,
1391 namespace,
1392 action: "requeue-dead",
1393 dead_total,
1394 requeued,
1395 pruned: 0,
1396 });
1397 return Ok(());
1398 }
1399
1400 if args.status {
1401 let paths = AppPaths::resolve(args.db.as_deref())?;
1402 ensure_db_ready(&paths)?;
1403 let conn = open_rw(&paths.db)?;
1404 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1405 let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1406 let scan_backlog =
1409 count_operation_backlog(&conn, &args.operation(), &namespace, args.target)?;
1410 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1411 let queue_conn = open_queue_db(&queue_path)?;
1412 let op_label = format!("{:?}", args.operation());
1413 let count_status = |st: &str, op: &str| -> i64 {
1417 queue_conn
1418 .query_row(
1419 "SELECT COUNT(*) FROM queue WHERE status=?1 \
1420 AND (operation = ?2 OR operation IS NULL)",
1421 rusqlite::params![st, op],
1422 |r| r.get(0),
1423 )
1424 .unwrap_or(0)
1425 };
1426 let eligible_now: i64 = queue_conn
1427 .query_row(
1428 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1429 AND (operation = ?1 OR operation IS NULL) \
1430 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1431 rusqlite::params![op_label],
1432 |r| r.get(0),
1433 )
1434 .unwrap_or(0);
1435 let waiting: i64 = queue_conn
1436 .query_row(
1437 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1438 AND (operation = ?1 OR operation IS NULL) \
1439 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1440 rusqlite::params![op_label],
1441 |r| r.get(0),
1442 )
1443 .unwrap_or(0);
1444 let waiting_items = {
1446 let mut stmt = queue_conn.prepare(
1447 "SELECT item_key, attempt, next_retry_at, error_class FROM queue \
1448 WHERE status='pending' AND (operation = ?1 OR operation IS NULL) \
1449 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now') \
1450 ORDER BY next_retry_at",
1451 )?;
1452 let items: Vec<WaitingItem> = stmt
1453 .query_map(rusqlite::params![op_label], |r| {
1454 Ok(WaitingItem {
1455 item_key: r.get(0)?,
1456 attempt: r.get(1)?,
1457 next_retry_at: r.get(2)?,
1458 error_class: r.get(3)?,
1459 })
1460 })?
1461 .collect::<Result<Vec<_>, _>>()?;
1462 items
1463 };
1464 let queue_pending = count_status("pending", &op_label);
1465 let queue_processing = count_status("processing", &op_label);
1466 let queue_done = count_status("done", &op_label);
1467 let queue_failed = count_status("failed", &op_label);
1468 let queue_skipped = count_status("skipped", &op_label);
1469 let queue_dead = count_status("dead", &op_label);
1470 let state = if eligible_now > 0 {
1472 "draining"
1473 } else if waiting > 0 {
1474 "cooldown"
1475 } else if queue_pending == 0 && scan_backlog > 0 {
1476 "pending-scan"
1477 } else {
1478 "empty"
1479 };
1480 emit_json(&EnrichStatus {
1481 status_report: true,
1482 operation: op_label,
1483 namespace,
1484 unbound_backlog,
1485 scan_backlog,
1486 queue_pending,
1487 queue_processing,
1488 queue_done,
1489 queue_failed,
1490 queue_skipped,
1491 queue_dead,
1492 eligible_now,
1493 waiting,
1494 state,
1495 waiting_items,
1496 });
1497 return Ok(());
1498 }
1499
1500 if args.mode() == EnrichMode::OpenRouter {
1505 let model = args.openrouter_model.as_deref().ok_or_else(|| {
1506 AppError::Validation(
1507 "--mode openrouter requires --openrouter-model (no default model is allowed)"
1508 .into(),
1509 )
1510 })?;
1511 let resolved =
1512 crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1513 .ok_or_else(|| {
1514 AppError::Validation(
1515 "OPENROUTER_API_KEY not found; set the env var, store it via \
1516 `config add-key --provider openrouter`, or pass --openrouter-api-key"
1517 .into(),
1518 )
1519 })?;
1520 crate::embedder::get_openrouter_chat_client(
1521 resolved.value,
1522 model,
1523 args.openrouter_timeout,
1524 )?;
1525 }
1526
1527 let started = Instant::now();
1528
1529 let paths = AppPaths::resolve(args.db.as_deref())?;
1530 ensure_db_ready(&paths)?;
1531 let conn = open_rw(&paths.db)?;
1532 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1533
1534 let wait_secs = args.wait_job_singleton;
1540 let force_flag = args.force_job_singleton;
1541 let _singleton = crate::lock::acquire_job_singleton(
1542 crate::lock::JobType::Enrich,
1543 &namespace,
1544 &paths.db,
1545 wait_secs,
1546 force_flag,
1547 )?;
1548
1549 let provider_binary = if matches!(args.operation(), EnrichOperation::ReEmbed) {
1551 None
1552 } else {
1553 Some(match args.mode() {
1554 EnrichMode::ClaudeCode => {
1555 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1556 let version = super::claude_runner::validate_claude_version(&bin)?;
1557 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1558 emit_json(&PhaseEvent {
1559 phase: "validate",
1560 binary_path: bin.to_str(),
1561 version: Some(&version),
1562 items_total: None,
1563 items_pending: None,
1564 llm_parallelism: None,
1565 });
1566 bin
1567 }
1568 EnrichMode::Codex => {
1569 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1570 emit_json(&PhaseEvent {
1571 phase: "validate",
1572 binary_path: bin.to_str(),
1573 version: None,
1574 items_total: None,
1575 items_pending: None,
1576 llm_parallelism: None,
1577 });
1578 bin
1579 }
1580 EnrichMode::Opencode => {
1581 let bin = super::opencode_runner::find_opencode_binary_with_override(
1582 args.opencode_binary.as_deref(),
1583 )?;
1584 emit_json(&PhaseEvent {
1585 phase: "validate",
1586 binary_path: bin.to_str(),
1587 version: None,
1588 items_total: None,
1589 items_pending: None,
1590 llm_parallelism: None,
1591 });
1592 bin
1593 }
1594 EnrichMode::OpenRouter => {
1595 emit_json(&PhaseEvent {
1600 phase: "validate",
1601 binary_path: None,
1602 version: None,
1603 items_total: None,
1604 items_pending: None,
1605 llm_parallelism: None,
1606 });
1607 PathBuf::new()
1608 }
1609 })
1610 };
1611
1612 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1616 let load = crate::system_load::load_average_one();
1617 let n = crate::system_load::ncpus();
1618 return Err(AppError::Validation(format!(
1619 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1620 pass --no-max-load-check to override (not recommended)"
1621 )));
1622 }
1623
1624 if args.preflight_check
1631 && !args.dry_run
1632 && !matches!(args.operation(), EnrichOperation::ReEmbed)
1633 {
1634 let preflight_result = run_preflight_probe(args);
1635 match preflight_result {
1636 PreflightOutcome::Healthy => {
1637 tracing::info!(target: "enrich", mode = ?args.mode(), "preflight probe healthy");
1638 }
1639 PreflightOutcome::RateLimited { reason, suggestion } => {
1640 if let Some(fallback) = args.fallback_mode.clone() {
1641 if fallback != args.mode() {
1642 return Err(AppError::Validation(format!(
1652 "preflight detected rate limit on {mode:?}: {reason}; \
1653 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1654 mode = args.mode()
1655 )));
1656 }
1657 return Err(AppError::Validation(format!(
1658 "preflight detected rate limit on {mode:?}: {reason}; \
1659 --fallback-mode matches --mode, no recovery possible",
1660 mode = args.mode()
1661 )));
1662 }
1663 return Err(AppError::Validation(format!(
1664 "preflight detected rate limit on {mode:?}: {reason}; \
1665 {suggestion}; pass --fallback-mode codex to recover",
1666 mode = args.mode()
1667 )));
1668 }
1669 PreflightOutcome::Error(e) => {
1670 return Err(e);
1671 }
1672 }
1673 }
1674
1675 let mut scan_result = scan_operation(&conn, &namespace, args)?;
1677 if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1686 let q_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1687 if let Ok(q) = open_queue_db(&q_path) {
1688 if let Ok(vetoed) = skipped_item_keys(&q, &format!("{:?}", args.operation())) {
1689 scan_result.retain(|k| !vetoed.contains(k));
1690 }
1691 }
1692 }
1693 let total = scan_result.len();
1694
1695 emit_json(&PhaseEvent {
1696 phase: "scan",
1697 binary_path: None,
1698 version: None,
1699 items_total: Some(total),
1700 items_pending: Some(total),
1701 llm_parallelism: Some(args.llm_parallelism),
1702 });
1703
1704 if args.dry_run {
1706 for (idx, key) in scan_result.iter().enumerate() {
1707 emit_json(&ItemEvent {
1708 item: key,
1709 status: "preview",
1710 memory_id: None,
1711 entity_id: None,
1712 entities: None,
1713 rels: None,
1714 chars_before: None,
1715 chars_after: None,
1716 cost_usd: None,
1717 elapsed_ms: None,
1718 error: None,
1719 index: idx,
1720 total,
1721 });
1722 }
1723 emit_json(&EnrichSummary {
1724 summary: true,
1725 operation: format!("{:?}", args.operation()),
1726 items_total: total,
1727 completed: 0,
1728 failed: 0,
1729 skipped: 0,
1730 cost_usd: 0.0,
1731 elapsed_ms: started.elapsed().as_millis() as u64,
1732 backend_invoked: take_enrich_backend(),
1733 waiting: 0,
1734 dead: 0,
1735 });
1736 return Ok(());
1737 }
1738
1739 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1743 let queue_conn = open_queue_db(&queue_path)?;
1744
1745 if args.resume {
1746 let reset = queue_conn
1747 .execute(
1748 "UPDATE queue SET status='pending' WHERE status='processing'",
1749 [],
1750 )
1751 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1752 if reset > 0 {
1753 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1754 }
1755 }
1756
1757 if args.retry_failed {
1758 let count = queue_conn
1759 .execute(
1760 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1761 [],
1762 )
1763 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1764 tracing::info!(target: "enrich", count, "retrying failed items");
1765 }
1766
1767 if !args.resume && !args.retry_failed && !args.until_empty {
1768 queue_conn
1769 .execute("DELETE FROM queue", [])
1770 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1771 }
1772
1773 let op_label = format!("{:?}", args.operation());
1775 let item_type = item_type_for(&args.operation());
1776 for key in scan_result.iter() {
1777 let it = item_type_for_key(key, item_type);
1781 enqueue_candidate(&queue_conn, &conn, &namespace, key, it, &op_label);
1782 }
1783
1784 let parallelism = if args.mode() == EnrichMode::OpenRouter {
1787 let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
1788 tracing::info!(
1789 target: "enrich",
1790 concurrency = rest,
1791 source = "rest_concurrency",
1792 "OpenRouter REST concurrency (clamp 1..=16)"
1793 );
1794 rest
1795 } else {
1796 let p = args.llm_parallelism.clamp(1, 32) as usize;
1797 tracing::info!(
1798 target: "enrich",
1799 concurrency = p,
1800 source = "llm_parallelism",
1801 "LLM subprocess parallelism (clamp 1..=32)"
1802 );
1803 p
1804 };
1805 if parallelism > 1 {
1806 tracing::info!(
1807 target: "enrich",
1808 llm_parallelism = parallelism,
1809 "parallel LLM processing with bounded thread pool"
1810 );
1811 }
1812 if parallelism > 4 {
1816 match args.mode() {
1817 EnrichMode::ClaudeCode => {
1818 tracing::warn!(
1819 target: "enrich",
1820 llm_parallelism = parallelism,
1821 recommended_max = 4,
1822 mode = "claude-code",
1823 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1824 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1825 to cut MCP children (G28-A)"
1826 );
1827 }
1828 EnrichMode::Codex if parallelism > 16 => {
1829 tracing::warn!(
1830 target: "enrich",
1831 llm_parallelism = parallelism,
1832 recommended_max = 16,
1833 mode = "codex",
1834 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1835 consider --llm-parallelism 8 for safer concurrency"
1836 );
1837 }
1838 EnrichMode::Codex => {
1839 }
1843 EnrichMode::Opencode if parallelism > 16 => {
1844 tracing::warn!(
1845 target: "enrich",
1846 llm_parallelism = parallelism,
1847 recommended_max = 16,
1848 mode = "opencode",
1849 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1850 consider --llm-parallelism 8 for safer concurrency"
1851 );
1852 }
1853 EnrichMode::Opencode => {
1854 }
1856 EnrichMode::OpenRouter => {
1857 }
1860 }
1861 }
1862
1863 let mut completed = 0usize;
1864 let mut failed = 0usize;
1865 let mut skipped = 0usize;
1866 let mut cost_total = 0.0f64;
1867 let mut oauth_detected = false;
1868 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1869 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1870 let enrich_started = std::time::Instant::now();
1871
1872 let provider_timeout = match args.mode() {
1873 EnrichMode::ClaudeCode => args.claude_timeout,
1874 EnrichMode::Codex => args.codex_timeout,
1875 EnrichMode::Opencode => args.opencode_timeout,
1876 EnrichMode::OpenRouter => args.openrouter_timeout,
1877 };
1878
1879 let provider_model: Option<&str> = match args.mode() {
1880 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1881 EnrichMode::Codex => args.codex_model.as_deref(),
1882 EnrichMode::Opencode => args.opencode_model.as_deref(),
1883 EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
1884 };
1885
1886 let backoff_clause: &str = if args.ignore_backoff {
1890 ""
1891 } else {
1892 "AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))"
1893 };
1894
1895 emit_json(&ConcurrencyEvent {
1898 phase: "concurrency",
1899 scan_parallelism: 1,
1900 drain_parallelism: parallelism as u32,
1901 });
1902
1903 let until_deadline = std::time::Instant::now()
1907 + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
1908 loop {
1909 if args.until_empty {
1910 let mut rescan = scan_operation(&conn, &namespace, args)?;
1914 if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1919 if let Ok(vetoed) = skipped_item_keys(&queue_conn, &op_label) {
1920 rescan.retain(|k| !vetoed.contains(k));
1921 }
1922 }
1923 for key in &rescan {
1924 let it = item_type_for_key(key, item_type);
1925 enqueue_candidate(&queue_conn, &conn, &namespace, key, it, &op_label);
1926 }
1927 }
1928 let completed_before = completed;
1929
1930 if parallelism > 1 {
1934 let stdout_mu = parking_lot::Mutex::new(());
1935 let budget = args.max_cost_usd;
1936 let operation = args.operation().clone();
1937 let mode = args.mode().clone();
1938 let min_oc = args.min_output_chars;
1939 let max_oc = args.max_output_chars;
1940 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1941
1942 struct WorkerResult {
1943 completed: usize,
1944 failed: usize,
1945 skipped: usize,
1946 cost: f64,
1947 oauth: bool,
1948 db_busy: bool,
1953 }
1954
1955 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1956 let handles: Vec<_> = (0..parallelism)
1957 .map(|worker_id| {
1958 let stdout_mu = &stdout_mu;
1959 let paths = &paths;
1960 let queue_path = &queue_path;
1961 let namespace = &namespace;
1962 let provider_binary = provider_binary.as_deref();
1963 let operation = &operation;
1964 let mode = &mode;
1965 let prompt_tpl = prompt_tpl.as_deref();
1966 s.spawn(move || {
1967 let w_conn = match open_rw(&paths.db) {
1968 Ok(c) => c,
1969 Err(e) => {
1970 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1971 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1972 }
1973 };
1974 let w_queue = match open_queue_db(queue_path) {
1975 Ok(c) => c,
1976 Err(e) => {
1977 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1978 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1979 }
1980 };
1981 let mut w_completed = 0usize;
1982 let mut w_failed = 0usize;
1983 let mut w_skipped = 0usize;
1984 let mut w_cost = 0.0f64;
1985 let mut w_oauth = false;
1986 let mut w_db_busy = false;
1987 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1988 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1989 let mut w_breaker = crate::retry::CircuitBreaker::new(
1995 args.circuit_breaker_threshold.max(1),
1996 std::time::Duration::from_secs(60),
1997 );
1998
1999 loop {
2000 if crate::shutdown_requested() {
2001 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2002 break;
2003 }
2004 if let Some(b) = budget {
2005 if !w_oauth && w_cost >= b {
2006 break;
2007 }
2008 }
2009 let pending = match crate::storage::utils::with_busy_retry(|| {
2028 dequeue_next_pending(&w_queue, backoff_clause)
2029 }) {
2030 Ok(DequeueOutcome::Claimed(p)) => Some(p),
2031 Ok(DequeueOutcome::Empty) => None,
2032 Err(AppError::DbBusy(msg)) => {
2033 tracing::error!(target: "enrich", worker = worker_id, error = %msg, "SQLITE_BUSY exhausted bounded retries, worker aborting");
2034 w_db_busy = true;
2035 None
2036 }
2037 Err(e) => {
2038 tracing::error!(target: "enrich", worker = worker_id, error = %e, "dequeue failed");
2039 None
2040 }
2041 };
2042 let (queue_id, item_key, _item_type, attempt_current) = match pending {
2043 Some(p) => p,
2044 None => break,
2045 };
2046 let item_started = Instant::now();
2047 let current_index = w_completed + w_failed + w_skipped;
2048
2049 let provider_bin = provider_binary.unwrap_or_else(|| std::path::Path::new(""));
2055 let call_result = match operation {
2056 EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2057 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2058 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2059 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2060 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2061 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2062 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2063 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2064 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2065 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2066 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2067 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2068 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, args.body_extract_graph_only),
2069 };
2070 let openrouter_diag = take_last_openrouter_failure();
2076
2077 match call_result {
2078 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2079 if is_oauth { w_oauth = true; }
2080 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2081 let _ = w_queue.execute(
2082 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2083 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2084 );
2085 w_completed += 1;
2086 if !is_oauth { w_cost += cost; }
2087 let _ = w_breaker
2089 .record(crate::retry::AttemptOutcome::Success);
2090 let _guard = stdout_mu.lock();
2091 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2092 }
2093 Ok(EnrichItemResult::Skipped { reason }) => {
2094 w_skipped += 1;
2095 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2096 let _guard = stdout_mu.lock();
2097 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2098 }
2099 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2100 w_skipped += 1;
2106 let reason = format!(
2107 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2108 );
2109 let _ = w_queue.execute(
2110 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2111 rusqlite::params![reason, queue_id],
2112 );
2113 let _guard = stdout_mu.lock();
2114 emit_json(&ItemEvent {
2115 item: &item_key,
2116 status: "preservation_failed",
2117 memory_id: None,
2118 entity_id: None,
2119 entities: None,
2120 rels: None,
2121 chars_before: Some(chars_before),
2122 chars_after: Some(chars_after),
2123 cost_usd: None,
2124 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2125 error: Some(reason),
2126 index: current_index,
2127 total,
2128 });
2129 }
2130 Err(e) => {
2131 let err_str = format!("{e}");
2132 if matches!(e, AppError::RateLimited { .. }) {
2133 if crate::retry::is_kill_switch_active() {
2134 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2135 } else if std::time::Instant::now() >= w_deadline {
2136 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2137 } else {
2138 let half = w_backoff / 2;
2139 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2140 let actual_wait = half + jitter;
2141 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2142 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2143 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2144 w_backoff = (w_backoff * 2).min(900);
2145 continue;
2146 }
2147 }
2148 w_failed += 1;
2149 let outcome = match openrouter_diag {
2156 Some(diag) => record_item_failure_typed(
2157 &w_queue,
2158 queue_id,
2159 attempt_current,
2160 args.max_attempts,
2161 diag.retry_class,
2162 &err_str,
2163 diag.finish_reason.as_deref(),
2164 diag.prompt_tokens,
2165 diag.completion_tokens,
2166 ),
2167 None => record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e),
2168 };
2169 let _guard = stdout_mu.lock();
2170 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2171 let breaker_opened = w_breaker.record(outcome);
2174 if breaker_opened {
2175 tracing::error!(target: "enrich",
2176 consecutive_failures = w_breaker.consecutive_failures(),
2177 "circuit breaker opened — aborting worker"
2178 );
2179 break;
2180 }
2181 }
2182 }
2183 }
2184 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth, db_busy: w_db_busy }
2185 })
2186 })
2187 .collect();
2188 handles
2189 .into_iter()
2190 .map(|h| {
2191 h.join().unwrap_or(WorkerResult {
2192 completed: 0,
2193 failed: 0,
2194 skipped: 0,
2195 cost: 0.0,
2196 oauth: false,
2197 db_busy: false,
2198 })
2199 })
2200 .collect()
2201 });
2202
2203 if results.iter().any(|r| r.db_busy) {
2209 return Err(AppError::DbBusy(
2210 "SQLITE_BUSY exhausted bounded retries while dequeuing (parallel worker)"
2211 .into(),
2212 ));
2213 }
2214
2215 for r in &results {
2216 completed += r.completed;
2217 failed += r.failed;
2218 skipped += r.skipped;
2219 cost_total += r.cost;
2220 if r.oauth && !oauth_detected {
2221 oauth_detected = true;
2222 }
2223 }
2224 } else {
2225 loop {
2227 if crate::shutdown_requested() {
2228 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2229 break;
2230 }
2231
2232 if let Some(budget) = args.max_cost_usd {
2234 if !oauth_detected && cost_total >= budget {
2235 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2236 break;
2237 }
2238 }
2239
2240 let pending = match crate::storage::utils::with_busy_retry(|| {
2256 dequeue_next_pending(&queue_conn, backoff_clause)
2257 }) {
2258 Ok(DequeueOutcome::Claimed(p)) => Some(p),
2259 Ok(DequeueOutcome::Empty) => None,
2260 Err(e @ AppError::DbBusy(_)) => {
2261 tracing::error!(target: "enrich", error = %e, "SQLITE_BUSY exhausted bounded retries, aborting drain loop");
2262 return Err(e);
2263 }
2264 Err(e) => {
2265 tracing::error!(target: "enrich", error = %e, "dequeue failed");
2266 None
2267 }
2268 };
2269
2270 let (queue_id, item_key, item_type, attempt_current) = match pending {
2271 Some(p) => p,
2272 None => break,
2273 };
2274
2275 let item_started = Instant::now();
2276 let current_index = completed + failed + skipped;
2277
2278 let provider_bin = provider_binary
2281 .as_deref()
2282 .unwrap_or_else(|| std::path::Path::new(""));
2283 let call_result = match args.operation() {
2284 EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => {
2285 call_memory_bindings(
2286 &conn,
2287 &namespace,
2288 &item_key,
2289 provider_bin,
2290 provider_model,
2291 provider_timeout,
2292 &args.mode(),
2293 )
2294 }
2295 EnrichOperation::EntityDescriptions => call_entity_description(
2296 &conn,
2297 &namespace,
2298 &item_key,
2299 provider_bin,
2300 provider_model,
2301 provider_timeout,
2302 &args.mode(),
2303 ),
2304 EnrichOperation::BodyEnrich => call_body_enrich(
2305 &conn,
2306 &namespace,
2307 &item_key,
2308 provider_bin,
2309 provider_model,
2310 provider_timeout,
2311 &args.mode(),
2312 args.min_output_chars,
2313 args.max_output_chars,
2314 args.prompt_template.as_deref(),
2315 args.preserve_threshold,
2316 &paths,
2317 llm_backend,
2318 embedding_backend,
2319 ),
2320 EnrichOperation::ReEmbed => call_reembed(
2321 &conn,
2322 &namespace,
2323 &item_key,
2324 &paths,
2325 llm_backend,
2326 embedding_backend,
2327 ),
2328 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2329 &conn,
2330 &namespace,
2331 &item_key,
2332 provider_bin,
2333 provider_model,
2334 provider_timeout,
2335 &args.mode(),
2336 ),
2337 EnrichOperation::RelationReclassify => call_relation_reclassify(
2338 &conn,
2339 &namespace,
2340 &item_key,
2341 provider_bin,
2342 provider_model,
2343 provider_timeout,
2344 &args.mode(),
2345 ),
2346 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2347 call_entity_connect(
2348 &conn,
2349 &namespace,
2350 &item_key,
2351 provider_bin,
2352 provider_model,
2353 provider_timeout,
2354 &args.mode(),
2355 )
2356 }
2357 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2358 &conn,
2359 &namespace,
2360 &item_key,
2361 provider_bin,
2362 provider_model,
2363 provider_timeout,
2364 &args.mode(),
2365 ),
2366 EnrichOperation::DescriptionEnrich => call_description_enrich(
2367 &conn,
2368 &namespace,
2369 &item_key,
2370 provider_bin,
2371 provider_model,
2372 provider_timeout,
2373 &args.mode(),
2374 ),
2375 EnrichOperation::DomainClassify => call_domain_classify(
2376 &conn,
2377 &namespace,
2378 &item_key,
2379 provider_bin,
2380 provider_model,
2381 provider_timeout,
2382 &args.mode(),
2383 ),
2384 EnrichOperation::GraphAudit => call_graph_audit(
2385 &conn,
2386 &namespace,
2387 &item_key,
2388 provider_bin,
2389 provider_model,
2390 provider_timeout,
2391 &args.mode(),
2392 ),
2393 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2394 &conn,
2395 &namespace,
2396 &item_key,
2397 provider_bin,
2398 provider_model,
2399 provider_timeout,
2400 &args.mode(),
2401 ),
2402 EnrichOperation::BodyExtract => call_body_extract(
2403 &conn,
2404 &namespace,
2405 &item_key,
2406 provider_bin,
2407 provider_model,
2408 provider_timeout,
2409 &args.mode(),
2410 args.body_extract_graph_only,
2411 ),
2412 };
2413 let openrouter_diag = take_last_openrouter_failure();
2416
2417 match call_result {
2418 Ok(EnrichItemResult::Done {
2419 memory_id,
2420 entity_id,
2421 entities,
2422 rels,
2423 chars_before,
2424 chars_after,
2425 cost,
2426 is_oauth,
2427 }) => {
2428 if is_oauth && !oauth_detected {
2429 oauth_detected = true;
2430 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2431 }
2432 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2433
2434 let persist_err: Option<String> = match args.operation() {
2436 EnrichOperation::MemoryBindings => {
2437 None
2439 }
2440 EnrichOperation::EntityDescriptions => {
2441 None
2443 }
2444 EnrichOperation::BodyEnrich => {
2445 None
2447 }
2448 _ => {
2449 None
2451 }
2452 };
2453
2454 if let Err(e) = queue_conn.execute(
2455 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2456 rusqlite::params![
2457 memory_id,
2458 entity_id,
2459 entities as i64,
2460 rels as i64,
2461 cost,
2462 item_started.elapsed().as_millis() as i64,
2463 queue_id
2464 ],
2465 ) {
2466 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2467 }
2468
2469 if persist_err.is_none() {
2470 completed += 1;
2471 if !is_oauth {
2472 cost_total += cost;
2473 }
2474 emit_json(&ItemEvent {
2475 item: &item_key,
2476 status: "done",
2477 memory_id,
2478 entity_id,
2479 entities: Some(entities),
2480 rels: Some(rels),
2481 chars_before,
2482 chars_after,
2483 cost_usd: if is_oauth { None } else { Some(cost) },
2484 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2485 error: None,
2486 index: current_index,
2487 total,
2488 });
2489 } else {
2490 failed += 1;
2491 emit_json(&ItemEvent {
2492 item: &item_key,
2493 status: "failed",
2494 memory_id: None,
2495 entity_id: None,
2496 entities: None,
2497 rels: None,
2498 chars_before: None,
2499 chars_after: None,
2500 cost_usd: None,
2501 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2502 error: persist_err,
2503 index: current_index,
2504 total,
2505 });
2506 }
2507 }
2508 Ok(EnrichItemResult::Skipped { reason }) => {
2509 skipped += 1;
2510 if let Err(e) = queue_conn.execute(
2511 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2512 rusqlite::params![reason, queue_id],
2513 ) {
2514 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2515 }
2516 emit_json(&ItemEvent {
2517 item: &item_key,
2518 status: "skipped",
2519 memory_id: None,
2520 entity_id: None,
2521 entities: None,
2522 rels: None,
2523 chars_before: None,
2524 chars_after: None,
2525 cost_usd: None,
2526 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2527 error: None,
2528 index: current_index,
2529 total,
2530 });
2531 }
2532 Ok(EnrichItemResult::PreservationFailed {
2533 score,
2534 threshold,
2535 chars_before,
2536 chars_after,
2537 }) => {
2538 skipped += 1;
2545 let reason = format!(
2546 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2547 );
2548 if let Err(qe) = queue_conn.execute(
2549 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2550 rusqlite::params![reason, queue_id],
2551 ) {
2552 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2553 }
2554 emit_json(&ItemEvent {
2555 item: &item_key,
2556 status: "preservation_failed",
2557 memory_id: None,
2558 entity_id: None,
2559 entities: None,
2560 rels: None,
2561 chars_before: Some(chars_before),
2562 chars_after: Some(chars_after),
2563 cost_usd: None,
2564 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2565 error: Some(reason),
2566 index: current_index,
2567 total,
2568 });
2569 }
2570 Err(e) => {
2571 let err_str = format!("{e}");
2572 if matches!(e, AppError::RateLimited { .. }) {
2573 if crate::retry::is_kill_switch_active() {
2574 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2575 } else if std::time::Instant::now() >= rate_limit_deadline {
2576 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2577 } else {
2578 let half = backoff_secs / 2;
2579 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2580 let actual_wait = half + jitter;
2581 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2582 if let Err(qe) = queue_conn.execute(
2583 "UPDATE queue SET status='pending' WHERE id=?1",
2584 rusqlite::params![queue_id],
2585 ) {
2586 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2587 }
2588 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2589 backoff_secs = (backoff_secs * 2).min(900);
2590 continue;
2591 }
2592 }
2593
2594 failed += 1;
2595 let _outcome = match openrouter_diag {
2600 Some(diag) => record_item_failure_typed(
2601 &queue_conn,
2602 queue_id,
2603 attempt_current,
2604 args.max_attempts,
2605 diag.retry_class,
2606 &err_str,
2607 diag.finish_reason.as_deref(),
2608 diag.prompt_tokens,
2609 diag.completion_tokens,
2610 ),
2611 None => record_item_failure(
2612 &queue_conn,
2613 queue_id,
2614 attempt_current,
2615 args.max_attempts,
2616 &e,
2617 ),
2618 };
2619 emit_json(&ItemEvent {
2620 item: &item_key,
2621 status: "failed",
2622 memory_id: None,
2623 entity_id: None,
2624 entities: None,
2625 rels: None,
2626 chars_before: None,
2627 chars_after: None,
2628 cost_usd: None,
2629 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2630 error: Some(err_str),
2631 index: current_index,
2632 total,
2633 });
2634 }
2635 }
2636
2637 let _ = item_type; }
2639 } if !args.until_empty {
2642 break;
2643 }
2644 let eligible_remaining: i64 = queue_conn
2645 .query_row(
2646 &format!("SELECT COUNT(*) FROM queue WHERE status='pending' {backoff_clause}"),
2647 [],
2648 |r| r.get(0),
2649 )
2650 .unwrap_or(0);
2651 let progressed = completed > completed_before;
2652 if std::time::Instant::now() >= until_deadline {
2653 tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
2654 break;
2655 }
2656 if !progressed && eligible_remaining == 0 {
2657 tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
2658 break;
2659 }
2660 if eligible_remaining == 0 {
2661 std::thread::sleep(std::time::Duration::from_secs(1));
2663 }
2664 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2667 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2668
2669 let waiting_final: i64 = queue_conn
2673 .query_row(
2674 "SELECT COUNT(*) FROM queue WHERE status='pending' \
2675 AND (operation = ?1 OR operation IS NULL) \
2676 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
2677 rusqlite::params![op_label],
2678 |r| r.get(0),
2679 )
2680 .unwrap_or(0);
2681 let dead_final: i64 = queue_conn
2682 .query_row(
2683 "SELECT COUNT(*) FROM queue WHERE status='dead' \
2684 AND (operation = ?1 OR operation IS NULL)",
2685 rusqlite::params![op_label],
2686 |r| r.get(0),
2687 )
2688 .unwrap_or(0);
2689
2690 emit_json(&EnrichSummary {
2691 summary: true,
2692 operation: format!("{:?}", args.operation()),
2693 items_total: total,
2694 completed,
2695 failed,
2696 skipped,
2697 cost_usd: cost_total,
2698 elapsed_ms: started.elapsed().as_millis() as u64,
2699 backend_invoked: take_enrich_backend(),
2700 waiting: waiting_final,
2701 dead: dead_final,
2702 });
2703
2704 if failed == 0 {
2705 let dead: i64 = queue_conn
2708 .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
2709 r.get(0)
2710 })
2711 .unwrap_or(0);
2712 let skipped_remaining: i64 = queue_conn
2718 .query_row(
2719 "SELECT COUNT(*) FROM queue WHERE status='skipped'",
2720 [],
2721 |r| r.get(0),
2722 )
2723 .unwrap_or(0);
2724 if dead == 0 && skipped_remaining == 0 {
2725 let _ = std::fs::remove_file(&queue_path);
2726 }
2727 }
2728
2729 Ok(())
2730}
2731
2732#[cfg(test)]
2739mod tests {
2740 use super::*;
2741
2742 #[test]
2743 fn bindings_schema_is_valid_json() {
2744 let _: serde_json::Value =
2745 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2746 }
2747
2748 #[test]
2749 fn entity_description_schema_is_valid_json() {
2750 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2751 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2752 }
2753
2754 #[test]
2755 fn body_enrich_schema_is_valid_json() {
2756 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2757 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2758 }
2759}