1mod extraction;
26mod postprocess;
27mod queue;
28mod scan;
29use extraction::{
30 call_body_enrich, call_body_extract, call_deep_research_synth, call_description_enrich,
31 call_domain_classify, call_entity_connect, call_entity_description, call_entity_type_validate,
32 call_graph_audit, call_memory_bindings, call_reembed, call_relation_reclassify,
33 call_weight_calibrate, find_codex_binary, take_last_openrouter_failure, EnrichItemResult,
34};
35use postprocess::{
36 persist_enriched_body, persist_entity_description, persist_memory_bindings,
37 reembed_memory_vector, take_enrich_backend,
38};
39pub use queue::{cleanup_queue_entry, DeadItem, DeadSummary, EnrichStatus, WaitingItem};
40use queue::{
41 dequeue_next_pending, enqueue_candidate, item_type_for, open_queue_db, prune_dead_orphans,
42 record_item_failure, record_item_failure_typed, skipped_item_keys, DequeueOutcome,
43};
44use scan::{
45 count_operation_backlog, scan_isolated_entity_pairs, scan_operation, scan_unbound_memories,
46};
47
48use crate::commands::ingest_claude::find_claude_binary;
49use crate::constants::MAX_MEMORY_BODY_LEN;
50use crate::entity_type::EntityType;
51use crate::errors::AppError;
52use crate::paths::AppPaths;
53use crate::storage::connection::{ensure_db_ready, open_rw};
54use crate::storage::entities::{self, NewEntity, NewRelationship};
55use crate::storage::memories;
56
57use rusqlite::Connection;
58use serde::{Deserialize, Serialize};
59use std::io::Write;
60use std::path::{Path, PathBuf};
61use std::time::Instant;
62
63const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
68const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
69const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
70
71const BINDINGS_SCHEMA: &str = r#"{
76 "type": "object",
77 "properties": {
78 "entities": {
79 "type": "array",
80 "items": {
81 "type": "object",
82 "properties": {
83 "name": { "type": "string" },
84 "entity_type": {
85 "type": "string",
86 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
87 }
88 },
89 "required": ["name", "entity_type"],
90 "additionalProperties": false
91 }
92 },
93 "relationships": {
94 "type": "array",
95 "items": {
96 "type": "object",
97 "properties": {
98 "source": { "type": "string" },
99 "target": { "type": "string" },
100 "relation": {
101 "type": "string",
102 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
103 },
104 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
105 },
106 "required": ["source","target","relation","strength"],
107 "additionalProperties": false
108 }
109 }
110 },
111 "required": ["entities","relationships"],
112 "additionalProperties": false
113}"#;
114
115const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
116 "type": "object",
117 "properties": {
118 "description": { "type": "string" }
119 },
120 "required": ["description"],
121 "additionalProperties": false
122}"#;
123
124const BODY_ENRICH_SCHEMA: &str = r#"{
125 "type": "object",
126 "properties": {
127 "enriched_body": { "type": "string" }
128 },
129 "required": ["enriched_body"],
130 "additionalProperties": false
131}"#;
132
133const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
135Scale:\n\
136- 0.9 = vital hard dependency (A cannot function without B)\n\
137- 0.7 = important design relationship (A strongly supports/enables B)\n\
138- 0.5 = useful contextual link (A and B share relevant context)\n\
139- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
140Respond with the calibrated weight and brief reasoning.";
141
142const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
143 "type": "object",
144 "properties": {
145 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
146 "reasoning": { "type": "string" }
147 },
148 "required": ["calibrated_weight", "reasoning"],
149 "additionalProperties": false
150}"#;
151
152const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
154Valid canonical relations (pick exactly one):\n\
155- depends-on: A cannot function without B\n\
156- uses: A utilizes B but could substitute it\n\
157- supports: A reinforces or enables B\n\
158- causes: A triggers or produces B\n\
159- fixes: A resolves a problem in B\n\
160- contradicts: A conflicts with or invalidates B\n\
161- applies-to: A is relevant to or scoped within B\n\
162- follows: A comes after B in sequence\n\
163- replaces: A substitutes B\n\
164- tracked-in: A is monitored in B\n\
165- related: A and B share context (use sparingly)\n\n\
166Respond with the correct relation, strength, and reasoning.";
167
168const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
169 "type": "object",
170 "properties": {
171 "relation": { "type": "string" },
172 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
173 "reasoning": { "type": "string" }
174 },
175 "required": ["relation", "strength", "reasoning"],
176 "additionalProperties": false
177}"#;
178
179const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
181Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
182If NO meaningful relationship exists, set relation to \"none\".\n\
183Respond with the relation (or \"none\"), strength, and reasoning.";
184
185const ENTITY_CONNECT_SCHEMA: &str = r#"{
186 "type": "object",
187 "properties": {
188 "relation": { "type": "string" },
189 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
190 "reasoning": { "type": "string" }
191 },
192 "required": ["relation", "strength", "reasoning"],
193 "additionalProperties": false
194}"#;
195
196const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
198Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
199If the current type is correct, keep it. If wrong, suggest the correct type.\n\
200Respond with the validated type and reasoning.";
201
202const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
203 "type": "object",
204 "properties": {
205 "validated_type": { "type": "string" },
206 "was_correct": { "type": "boolean" },
207 "reasoning": { "type": "string" }
208 },
209 "required": ["validated_type", "was_correct", "reasoning"],
210 "additionalProperties": false
211}"#;
212
213const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
215BAD: 'ingested from docs/auth.md'\n\
216GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
217Respond with the improved description and reasoning.";
218
219const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
220 "type": "object",
221 "properties": {
222 "description": { "type": "string" },
223 "reasoning": { "type": "string" }
224 },
225 "required": ["description", "reasoning"],
226 "additionalProperties": false
227}"#;
228
229const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
231Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
232
233const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
234 "type": "object",
235 "properties": {
236 "domain": { "type": "string" },
237 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
238 "reasoning": { "type": "string" }
239 },
240 "required": ["domain", "confidence", "reasoning"],
241 "additionalProperties": false
242}"#;
243
244const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
246Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
247Respond with a list of issues found (or empty if none) and an overall quality score.";
248
249const GRAPH_AUDIT_SCHEMA: &str = r#"{
250 "type": "object",
251 "properties": {
252 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
253 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
254 "reasoning": { "type": "string" }
255 },
256 "required": ["quality_score", "issues", "reasoning"],
257 "additionalProperties": false
258}"#;
259
260const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
262Entity names: lowercase kebab-case, domain-specific.\n\
263Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
264Respond with extracted entities, relationships, and a synthesis summary.";
265
266const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
267 "type": "object",
268 "properties": {
269 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
270 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
271 "summary": { "type": "string" }
272 },
273 "required": ["entities", "relationships", "summary"],
274 "additionalProperties": false
275}"#;
276
277const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
279Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
280Respond with the restructured body and a brief summary of changes.";
281
282const BODY_EXTRACT_SCHEMA: &str = r#"{
283 "type": "object",
284 "properties": {
285 "restructured_body": { "type": "string" },
286 "changes_summary": { "type": "string" }
287 },
288 "required": ["restructured_body", "changes_summary"],
289 "additionalProperties": false
290}"#;
291
292const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2971. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2982. Typed relationships between entities with strength scores\n\n\
299Rules:\n\
300- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
301- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
302- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
303- NEVER use 'mentions' as relationship type\n\
304- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
305- Prefer fewer high-quality entities over many low-quality ones";
306
307const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
308
309const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
310
311#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
317#[serde(rename_all = "kebab-case")]
318pub enum EnrichOperation {
319 MemoryBindings,
324 AugmentBindings,
329 EntityDescriptions,
331 BodyEnrich,
333 ReEmbed,
335 WeightCalibrate,
337 RelationReclassify,
339 EntityConnect,
341 EntityTypeValidate,
343 DescriptionEnrich,
345 CrossDomainBridges,
347 DomainClassify,
349 GraphAudit,
351 DeepResearchSynth,
353 BodyExtract,
355}
356
357#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
359pub enum EnrichMode {
360 ClaudeCode,
362 Codex,
364 #[value(name = "opencode")]
366 Opencode,
367 #[value(name = "openrouter")]
369 OpenRouter,
370}
371
372impl std::fmt::Display for EnrichMode {
373 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374 match self {
375 EnrichMode::ClaudeCode => write!(f, "claude-code"),
376 EnrichMode::Codex => write!(f, "codex"),
377 EnrichMode::Opencode => write!(f, "opencode"),
378 EnrichMode::OpenRouter => write!(f, "openrouter"),
379 }
380 }
381}
382
383#[derive(clap::Args)]
385#[command(
386 about = "Enrich graph memories and entities using an LLM provider",
387 after_long_help = "EXAMPLES:\n \
388 # Add missing entity bindings to all unbound memories\n \
389 sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n \
390 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
391 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
392 # Expand short memory bodies (GAP-18)\n \
393 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
394 # Rebuild only missing memory embeddings without rewriting bodies\n \
395 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
396 # Resume an interrupted body-enrich run\n \
397 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
398 # Retry only failed items from a previous run\n \
399 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n \
400 # Converge the whole backlog (internal scan+drain loop, no bash wrapper)\n \
401 sqlite-graphrag enrich --operation memory-bindings --mode openrouter \\\n \
402 --openrouter-model deepseek/deepseek-v4-flash:nitro --until-empty --max-runtime 600\n\n \
403 # Inspect / resurrect dead-letter items\n \
404 sqlite-graphrag enrich --operation memory-bindings --list-dead\n \
405 sqlite-graphrag enrich --operation memory-bindings --requeue-dead\n\n \
406 # Read-only status (no LLM, no singleton)\n \
407 sqlite-graphrag enrich --operation memory-bindings --status\n\n\
408 OPERATIONS NOTE:\n \
409 memory-bindings LINKS each memory to the EXISTING entities extracted from its\n \
410 body — it does not invent a new graph, it connects what is missing. It scans\n \
411 only UNBOUND memories. To re-run extraction over ALREADY-bound memories and\n \
412 MERGE newly-found entities/relationships additively (without removing links),\n \
413 use --operation augment-bindings with --names/--names-file.\n\n\
414 DEAD-LETTER SIDECAR (.enrich-queue.sqlite):\n \
415 A SQLite sidecar tracks each work item across runs. Schema (table `queue`):\n \
416 item_key (UNIQUE name/id), item_type (memory|entity), operation, memory_id,\n \
417 status (pending|processing|done|skipped|dead), attempt, error, error_class,\n \
418 next_retry_at (backoff cooldown). --until-empty loops scan→drain internally\n \
419 until eligible items are exhausted; transient failures (incl. malformed/non-\n \
420 JSON LLM output, GAP-SG-09) reschedule with backoff until --max-attempts, then\n \
421 land in status='dead'. Use --status to see the queue, --list-dead to inspect\n \
422 the sink, --requeue-dead to retry it, and --ignore-backoff to skip cooldowns.\n \
423 --names/--names-file also remedy a cooldown by targeting a specific subset.\n\n\
424 EXIT CODES:\n \
425 0 success\n \
426 1 validation error (bad args, binary not found)\n \
427 14 I/O error"
428)]
429pub struct EnrichArgs {
430 #[arg(
435 long,
436 short = 'o',
437 value_enum,
438 value_name = "OPERATION",
439 required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
440 )]
441 pub operation: Option<EnrichOperation>,
442
443 #[arg(
447 long,
448 value_enum,
449 required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
450 )]
451 pub mode: Option<EnrichMode>,
452
453 #[arg(long, value_name = "N")]
455 pub limit: Option<usize>,
456
457 #[arg(long)]
459 pub dry_run: bool,
460
461 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
463 pub namespace: Option<String>,
464
465 #[arg(long, value_name = "PATH")]
468 pub claude_binary: Option<PathBuf>,
469
470 #[arg(long, value_name = "MODEL")]
472 pub claude_model: Option<String>,
473
474 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
476 pub claude_timeout: u64,
477
478 #[arg(long, value_name = "PATH")]
481 pub codex_binary: Option<PathBuf>,
482
483 #[arg(long, value_name = "MODEL")]
485 pub codex_model: Option<String>,
486
487 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
489 pub codex_timeout: u64,
490
491 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
494 pub opencode_binary: Option<PathBuf>,
495
496 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
498 pub opencode_model: Option<String>,
499
500 #[arg(
502 long,
503 value_name = "SECONDS",
504 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
505 default_value_t = 300
506 )]
507 pub opencode_timeout: u64,
508
509 #[arg(long, value_name = "MODEL")]
512 pub openrouter_model: Option<String>,
513
514 #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
516 pub openrouter_api_key: Option<String>,
517
518 #[arg(long, value_name = "SECONDS", default_value_t = 600)]
525 pub openrouter_timeout: u64,
526
527 #[arg(long, value_name = "URL")]
529 pub openrouter_base_url: Option<String>,
530
531 #[arg(long, value_name = "USD")]
534 pub max_cost_usd: Option<f64>,
535
536 #[arg(long)]
539 pub resume: bool,
540
541 #[arg(long)]
543 pub retry_failed: bool,
544
545 #[arg(long)]
549 pub until_empty: bool,
550
551 #[arg(long, value_name = "SECONDS")]
554 pub max_runtime: Option<u64>,
555
556 #[arg(long, value_name = "N", default_value_t = 8, value_parser = clap::value_parser!(u32).range(1..=20))]
568 pub max_attempts: u32,
569
570 #[arg(long)]
573 pub status: bool,
574
575 #[arg(long)]
580 pub list_dead: bool,
581
582 #[arg(long)]
589 pub requeue_dead: bool,
590
591 #[arg(long)]
599 pub prune_dead_orphans: bool,
600
601 #[arg(long)]
607 pub ignore_backoff: bool,
608
609 #[arg(long)]
616 pub body_extract_graph_only: bool,
617
618 #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
621 pub rest_concurrency: Option<u32>,
622
623 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
626 pub min_output_chars: usize,
627
628 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
630 pub max_output_chars: usize,
631
632 #[arg(long, default_value_t = true)]
634 pub preserve_check: bool,
635
636 #[arg(long, value_name = "PATH")]
638 pub prompt_template: Option<PathBuf>,
639
640 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
644 pub llm_parallelism: u32,
645
646 #[arg(long)]
649 pub json: bool,
650
651 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
653 pub db: Option<String>,
654
655 #[arg(long, value_name = "SECONDS")]
658 pub wait_job_singleton: Option<u64>,
659
660 #[arg(long, default_value_t = false)]
664 pub force_job_singleton: bool,
665
666 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
676 pub names: Vec<String>,
677
678 #[arg(long, value_name = "PATH")]
682 pub names_file: Option<PathBuf>,
683
684 #[arg(long, default_value_t = false)]
688 pub preflight_check: bool,
689
690 #[arg(long, value_enum)]
694 pub fallback_mode: Option<EnrichMode>,
695
696 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
699 pub rate_limit_buffer: u64,
700
701 #[arg(long, default_value_t = true)]
705 pub max_load_check: bool,
706
707 #[arg(long, value_name = "N", default_value_t = 5)]
710 pub circuit_breaker_threshold: u32,
711
712 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
719 pub preserve_threshold: f64,
720
721 #[arg(long, default_value_t = true)]
726 pub codex_model_validate: bool,
727
728 #[arg(long, value_name = "MODEL")]
733 pub codex_model_fallback: Option<String>,
734}
735
736impl EnrichArgs {
737 fn operation(&self) -> EnrichOperation {
745 self.operation
746 .clone()
747 .unwrap_or(EnrichOperation::MemoryBindings)
748 }
749
750 fn mode(&self) -> EnrichMode {
755 self.mode.clone().unwrap_or(EnrichMode::OpenRouter)
756 }
757}
758
759#[derive(Debug, Serialize)]
768struct PhaseEvent<'a> {
769 phase: &'a str,
770 #[serde(skip_serializing_if = "Option::is_none")]
771 binary_path: Option<&'a str>,
772 #[serde(skip_serializing_if = "Option::is_none")]
773 version: Option<&'a str>,
774 #[serde(skip_serializing_if = "Option::is_none")]
775 items_total: Option<usize>,
776 #[serde(skip_serializing_if = "Option::is_none")]
777 items_pending: Option<usize>,
778 #[serde(skip_serializing_if = "Option::is_none")]
780 llm_parallelism: Option<u32>,
781}
782
783#[derive(Debug, Serialize)]
788struct ConcurrencyEvent {
789 phase: &'static str,
790 scan_parallelism: u32,
791 drain_parallelism: u32,
792}
793
794#[derive(Debug, Serialize)]
795struct ItemEvent<'a> {
796 item: &'a str,
798 status: &'a str,
799 #[serde(skip_serializing_if = "Option::is_none")]
800 memory_id: Option<i64>,
801 #[serde(skip_serializing_if = "Option::is_none")]
802 entity_id: Option<i64>,
803 #[serde(skip_serializing_if = "Option::is_none")]
804 entities: Option<usize>,
805 #[serde(skip_serializing_if = "Option::is_none")]
806 rels: Option<usize>,
807 #[serde(skip_serializing_if = "Option::is_none")]
808 chars_before: Option<usize>,
809 #[serde(skip_serializing_if = "Option::is_none")]
810 chars_after: Option<usize>,
811 #[serde(skip_serializing_if = "Option::is_none")]
812 cost_usd: Option<f64>,
813 #[serde(skip_serializing_if = "Option::is_none")]
814 elapsed_ms: Option<u64>,
815 #[serde(skip_serializing_if = "Option::is_none")]
816 error: Option<String>,
817 index: usize,
818 total: usize,
819}
820
821#[derive(Debug, Serialize)]
822struct EnrichSummary {
823 summary: bool,
824 operation: String,
825 items_total: usize,
826 completed: usize,
827 failed: usize,
828 skipped: usize,
829 cost_usd: f64,
830 elapsed_ms: u64,
831 #[serde(skip_serializing_if = "Option::is_none")]
836 backend_invoked: Option<&'static str>,
837 waiting: i64,
841 dead: i64,
844}
845
846use crate::output::emit_json_line as emit_json;
847
848enum PreflightOutcome {
862 Healthy,
864 RateLimited {
868 reason: String,
869 suggestion: &'static str,
870 },
871 Error(AppError),
873}
874
875fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
883 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
884
885 match args.mode() {
886 EnrichMode::ClaudeCode => {
887 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
888 Ok(b) => b,
889 Err(e) => return PreflightOutcome::Error(e),
890 };
891 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
896 Ok(p) => p,
897 Err(e) => {
898 return PreflightOutcome::Error(AppError::Io(e));
899 }
900 };
901 let mut cmd = std::process::Command::new(&bin);
902 crate::spawn::env_whitelist::apply_env_whitelist(
903 &mut cmd,
904 crate::spawn::env_whitelist::is_strict_env_clear(),
905 );
906 if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
907 return PreflightOutcome::Error(e);
908 }
909 cmd.arg("-p")
910 .arg("ping")
911 .arg("--max-turns")
912 .arg("1")
913 .arg("--strict-mcp-config")
914 .arg("--mcp-config")
915 .arg(mcp_config_path.as_os_str())
916 .arg("--dangerously-skip-permissions")
917 .arg("--settings")
918 .arg("{\"hooks\":{}}")
919 .arg("--output-format")
920 .arg("json")
921 .stdin(std::process::Stdio::null())
922 .stdout(std::process::Stdio::piped())
923 .stderr(std::process::Stdio::piped());
924
925 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
926 Ok(c) => c,
927 Err(e) => {
928 return PreflightOutcome::Error(AppError::Io(e));
929 }
930 };
931 let output = match wait_with_timeout(child, timeout) {
932 Ok(out) => out,
933 Err(e) => return PreflightOutcome::Error(e),
934 };
935 if !output.status.success() {
936 let stderr = String::from_utf8_lossy(&output.stderr);
937 if stderr.contains("hit your session limit")
938 || stderr.contains("rate_limit")
939 || stderr.contains("429")
940 {
941 return PreflightOutcome::RateLimited {
942 reason: stderr.trim().to_string(),
943 suggestion:
944 "wait for the OAuth window to reset or use --fallback-mode codex",
945 };
946 }
947 return PreflightOutcome::Error(AppError::Validation(format!(
948 "preflight probe failed: {stderr}",
949 stderr = stderr.trim()
950 )));
951 }
952 PreflightOutcome::Healthy
953 }
954 EnrichMode::Codex => {
955 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
956 Ok(b) => b,
957 Err(e) => return PreflightOutcome::Error(e),
958 };
959 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
960 .map_err(PreflightOutcome::Error)
961 .ok();
962 let schema = "{}";
963 let schema_path = match super::codex_spawn::trusted_schema_path() {
964 Ok(p) => p,
965 Err(e) => return PreflightOutcome::Error(e),
966 };
967 let spawn_args = super::codex_spawn::CodexSpawnArgs {
968 binary: &bin,
969 prompt: "ping",
970 json_schema: schema,
971 input_text: "",
972 model: args.codex_model.as_deref(),
973 timeout_secs: args.rate_limit_buffer.max(60),
974 schema_path: schema_path.clone(),
975 };
976 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
977 Ok(c) => c,
978 Err(e) => return PreflightOutcome::Error(e),
979 };
980 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
981 Ok(c) => c,
982 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
983 };
984 let output = match wait_with_timeout(child, timeout) {
985 Ok(out) => out,
986 Err(e) => return PreflightOutcome::Error(e),
987 };
988 let _ = std::fs::remove_file(&schema_path);
989 if !output.status.success() {
990 let stderr = String::from_utf8_lossy(&output.stderr);
991 if stderr.contains("rate_limit")
992 || stderr.contains("429")
993 || stderr.contains("Too Many Requests")
994 {
995 return PreflightOutcome::RateLimited {
996 reason: stderr.trim().to_string(),
997 suggestion: "wait for the rate-limit window to reset",
998 };
999 }
1000 return PreflightOutcome::Error(AppError::Validation(format!(
1001 "preflight probe failed: {stderr}",
1002 stderr = stderr.trim()
1003 )));
1004 }
1005 PreflightOutcome::Healthy
1006 }
1007 EnrichMode::Opencode => {
1008 let bin = match super::opencode_runner::find_opencode_binary_with_override(
1009 args.opencode_binary.as_deref(),
1010 ) {
1011 Ok(b) => b,
1012 Err(e) => return PreflightOutcome::Error(e),
1013 };
1014 let model =
1015 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1016 let mut cmd =
1017 match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1018 {
1019 Ok(c) => c,
1020 Err(e) => return PreflightOutcome::Error(e),
1021 };
1022 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1023 Ok(c) => c,
1024 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1025 };
1026 let output = match wait_with_timeout(child, timeout) {
1027 Ok(out) => out,
1028 Err(e) => return PreflightOutcome::Error(e),
1029 };
1030 if !output.status.success() {
1031 let stderr = String::from_utf8_lossy(&output.stderr);
1032 if stderr.contains("rate_limit")
1033 || stderr.contains("429")
1034 || stderr.contains("Too Many Requests")
1035 {
1036 return PreflightOutcome::RateLimited {
1037 reason: stderr.trim().to_string(),
1038 suggestion: "wait for the rate-limit window to reset",
1039 };
1040 }
1041 return PreflightOutcome::Error(AppError::Validation(format!(
1042 "preflight probe failed: {stderr}",
1043 stderr = stderr.trim()
1044 )));
1045 }
1046 PreflightOutcome::Healthy
1047 }
1048 EnrichMode::OpenRouter => {
1049 match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1053 Some(_) => PreflightOutcome::Healthy,
1054 None => PreflightOutcome::Error(AppError::Validation(
1055 "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1056 )),
1057 }
1058 }
1059 }
1060}
1061
1062fn wait_with_timeout(
1064 mut child: std::process::Child,
1065 timeout: std::time::Duration,
1066) -> Result<std::process::Output, AppError> {
1067 use wait_timeout::ChildExt;
1068 let start = std::time::Instant::now();
1069 let Some(exit) = child.wait_timeout(timeout).map_err(AppError::Io)? else {
1070 let _ = child.kill();
1071 let _ = child.wait();
1072 return Err(AppError::Validation(format!(
1073 "preflight probe timed out after {}s",
1074 start.elapsed().as_secs()
1075 )));
1076 };
1077 let mut stdout = Vec::new();
1078 if let Some(mut out) = child.stdout.take() {
1079 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1080 }
1081 let mut stderr = Vec::new();
1082 if let Some(mut err) = child.stderr.take() {
1083 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1084 }
1085 Ok(std::process::Output {
1086 status: exit,
1087 stdout,
1088 stderr,
1089 })
1090}
1091
1092fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1108 value == default
1109}
1110
1111fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1126 const DEFAULT_TIMEOUT: u64 = 300;
1127
1128 let mut conflicts: Vec<String> = Vec::new();
1129
1130 match args.mode() {
1131 EnrichMode::ClaudeCode => {
1132 if args.codex_binary.is_some() {
1133 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1134 }
1135 if args.codex_model.is_some() {
1136 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1137 }
1138 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1139 conflicts.push(format!(
1140 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1141 args.codex_timeout
1142 ));
1143 }
1144 }
1145 EnrichMode::Codex => {
1146 if args.claude_binary.is_some() {
1147 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1148 }
1149 if args.claude_model.is_some() {
1150 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1151 }
1152 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1153 conflicts.push(format!(
1154 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1155 args.claude_timeout
1156 ));
1157 }
1158 if args.max_cost_usd.is_some() {
1159 conflicts.push(
1160 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1161 .to_string(),
1162 );
1163 }
1164 }
1165 EnrichMode::Opencode => {
1166 if args.claude_binary.is_some() {
1167 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1168 }
1169 if args.claude_model.is_some() {
1170 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1171 }
1172 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1173 conflicts.push(format!(
1174 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1175 args.claude_timeout
1176 ));
1177 }
1178 if args.max_cost_usd.is_some() {
1179 conflicts.push(
1180 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1181 .to_string(),
1182 );
1183 }
1184 }
1185 EnrichMode::OpenRouter => {
1186 if args.claude_binary.is_some() {
1187 conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1188 }
1189 if args.claude_model.is_some() {
1190 conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1191 }
1192 if args.codex_binary.is_some() {
1193 conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1194 }
1195 if args.codex_model.is_some() {
1196 conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1197 }
1198 if args.opencode_binary.is_some() {
1199 conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1200 }
1201 if args.opencode_model.is_some() {
1202 conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1203 }
1204 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1205 conflicts.push(format!(
1206 "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1207 args.claude_timeout
1208 ));
1209 }
1210 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1211 conflicts.push(format!(
1212 "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1213 args.codex_timeout
1214 ));
1215 }
1216 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1217 conflicts.push(format!(
1218 "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1219 args.opencode_timeout
1220 ));
1221 }
1222 }
1223 }
1224
1225 if !conflicts.is_empty() {
1226 return Err(AppError::Validation(format!(
1227 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1228 args.mode(),
1229 conflicts.join("\n - ")
1230 )));
1231 }
1232
1233 Ok(())
1234}
1235
1236pub fn run(
1240 args: &EnrichArgs,
1241 llm_backend: crate::cli::LlmBackendChoice,
1242 embedding_backend: crate::cli::EmbeddingBackendChoice,
1243) -> Result<(), AppError> {
1244 validate_mode_conditional_flags_enrich(args)?;
1247
1248 if args.list_dead || args.requeue_dead || args.prune_dead_orphans {
1257 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1258 let op_label = format!("{:?}", args.operation());
1259 let paths = AppPaths::resolve(args.db.as_deref())?;
1260 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1261 let queue_conn = open_queue_db(&queue_path)?;
1262 if args.prune_dead_orphans {
1265 ensure_db_ready(&paths)?;
1266 let main_conn = open_rw(&paths.db)?;
1267 let pruned = prune_dead_orphans(&queue_conn, &main_conn, &op_label, &namespace)?;
1268 let dead_total: i64 = queue_conn
1269 .query_row(
1270 "SELECT COUNT(*) FROM queue WHERE status='dead' \
1271 AND (operation = ?1 OR operation IS NULL)",
1272 rusqlite::params![op_label],
1273 |r| r.get(0),
1274 )
1275 .unwrap_or(0);
1276 emit_json(&DeadSummary {
1277 summary: true,
1278 operation: op_label,
1279 namespace,
1280 action: "prune-dead-orphans",
1281 dead_total,
1282 requeued: 0,
1283 pruned,
1284 });
1285 return Ok(());
1286 }
1287 if args.list_dead {
1288 let mut stmt = queue_conn.prepare(
1289 "SELECT item_key, item_type, attempt, error_class, error, \
1290 finish_reason, input_tokens, output_tokens FROM queue \
1291 WHERE status='dead' AND (operation = ?1 OR operation IS NULL) ORDER BY id",
1292 )?;
1293 let rows = stmt
1294 .query_map(rusqlite::params![op_label], |r| {
1295 Ok(DeadItem {
1296 dead_item: true,
1297 item_key: r.get(0)?,
1298 item_type: r.get(1)?,
1299 attempt: r.get(2)?,
1300 error_class: r.get(3)?,
1301 error: r.get(4)?,
1302 finish_reason: r.get(5)?,
1303 input_tokens: r.get(6)?,
1304 output_tokens: r.get(7)?,
1305 })
1306 })?
1307 .collect::<Result<Vec<_>, _>>()?;
1308 let dead_total = rows.len() as i64;
1309 for item in &rows {
1310 emit_json(item);
1311 }
1312 emit_json(&DeadSummary {
1313 summary: true,
1314 operation: op_label,
1315 namespace,
1316 action: "list-dead",
1317 dead_total,
1318 requeued: 0,
1319 pruned: 0,
1320 });
1321 return Ok(());
1322 }
1323 let dead_total: i64 = queue_conn
1325 .query_row(
1326 "SELECT COUNT(*) FROM queue WHERE status='dead' \
1327 AND (operation = ?1 OR operation IS NULL)",
1328 rusqlite::params![op_label],
1329 |r| r.get(0),
1330 )
1331 .unwrap_or(0);
1332 let requeued = queue_conn
1333 .execute(
1334 "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1335 error=NULL, error_class=NULL \
1336 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1337 rusqlite::params![op_label],
1338 )
1339 .map_err(|e| AppError::Validation(format!("requeue-dead failed: {e}")))?
1340 as i64;
1341 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1342 emit_json(&DeadSummary {
1343 summary: true,
1344 operation: op_label,
1345 namespace,
1346 action: "requeue-dead",
1347 dead_total,
1348 requeued,
1349 pruned: 0,
1350 });
1351 return Ok(());
1352 }
1353
1354 if args.status {
1355 let paths = AppPaths::resolve(args.db.as_deref())?;
1356 ensure_db_ready(&paths)?;
1357 let conn = open_rw(&paths.db)?;
1358 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1359 let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1360 let scan_backlog = count_operation_backlog(&conn, &args.operation(), &namespace)?;
1363 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1364 let queue_conn = open_queue_db(&queue_path)?;
1365 let op_label = format!("{:?}", args.operation());
1366 let count_status = |st: &str, op: &str| -> i64 {
1370 queue_conn
1371 .query_row(
1372 "SELECT COUNT(*) FROM queue WHERE status=?1 \
1373 AND (operation = ?2 OR operation IS NULL)",
1374 rusqlite::params![st, op],
1375 |r| r.get(0),
1376 )
1377 .unwrap_or(0)
1378 };
1379 let eligible_now: i64 = queue_conn
1380 .query_row(
1381 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1382 AND (operation = ?1 OR operation IS NULL) \
1383 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1384 rusqlite::params![op_label],
1385 |r| r.get(0),
1386 )
1387 .unwrap_or(0);
1388 let waiting: i64 = queue_conn
1389 .query_row(
1390 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1391 AND (operation = ?1 OR operation IS NULL) \
1392 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1393 rusqlite::params![op_label],
1394 |r| r.get(0),
1395 )
1396 .unwrap_or(0);
1397 let waiting_items = {
1399 let mut stmt = queue_conn.prepare(
1400 "SELECT item_key, attempt, next_retry_at, error_class FROM queue \
1401 WHERE status='pending' AND (operation = ?1 OR operation IS NULL) \
1402 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now') \
1403 ORDER BY next_retry_at",
1404 )?;
1405 let items: Vec<WaitingItem> = stmt
1406 .query_map(rusqlite::params![op_label], |r| {
1407 Ok(WaitingItem {
1408 item_key: r.get(0)?,
1409 attempt: r.get(1)?,
1410 next_retry_at: r.get(2)?,
1411 error_class: r.get(3)?,
1412 })
1413 })?
1414 .collect::<Result<Vec<_>, _>>()?;
1415 items
1416 };
1417 let queue_pending = count_status("pending", &op_label);
1418 let queue_processing = count_status("processing", &op_label);
1419 let queue_done = count_status("done", &op_label);
1420 let queue_failed = count_status("failed", &op_label);
1421 let queue_skipped = count_status("skipped", &op_label);
1422 let queue_dead = count_status("dead", &op_label);
1423 let state = if eligible_now > 0 {
1425 "draining"
1426 } else if waiting > 0 {
1427 "cooldown"
1428 } else if queue_pending == 0 && scan_backlog > 0 {
1429 "pending-scan"
1430 } else {
1431 "empty"
1432 };
1433 emit_json(&EnrichStatus {
1434 status_report: true,
1435 operation: op_label,
1436 namespace,
1437 unbound_backlog,
1438 scan_backlog,
1439 queue_pending,
1440 queue_processing,
1441 queue_done,
1442 queue_failed,
1443 queue_skipped,
1444 queue_dead,
1445 eligible_now,
1446 waiting,
1447 state,
1448 waiting_items,
1449 });
1450 return Ok(());
1451 }
1452
1453 if args.mode() == EnrichMode::OpenRouter {
1458 let model = args.openrouter_model.as_deref().ok_or_else(|| {
1459 AppError::Validation(
1460 "--mode openrouter requires --openrouter-model (no default model is allowed)"
1461 .into(),
1462 )
1463 })?;
1464 let resolved =
1465 crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1466 .ok_or_else(|| {
1467 AppError::Validation(
1468 "OPENROUTER_API_KEY not found; set the env var, store it via \
1469 `config add-key --provider openrouter`, or pass --openrouter-api-key"
1470 .into(),
1471 )
1472 })?;
1473 crate::embedder::get_openrouter_chat_client(
1474 resolved.value,
1475 model,
1476 args.openrouter_timeout,
1477 )?;
1478 }
1479
1480 let started = Instant::now();
1481
1482 let paths = AppPaths::resolve(args.db.as_deref())?;
1483 ensure_db_ready(&paths)?;
1484 let conn = open_rw(&paths.db)?;
1485 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1486
1487 let wait_secs = args.wait_job_singleton;
1493 let force_flag = args.force_job_singleton;
1494 let _singleton = crate::lock::acquire_job_singleton(
1495 crate::lock::JobType::Enrich,
1496 &namespace,
1497 &paths.db,
1498 wait_secs,
1499 force_flag,
1500 )?;
1501
1502 let provider_binary = if matches!(args.operation(), EnrichOperation::ReEmbed) {
1504 None
1505 } else {
1506 Some(match args.mode() {
1507 EnrichMode::ClaudeCode => {
1508 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1509 let version = super::claude_runner::validate_claude_version(&bin)?;
1510 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1511 emit_json(&PhaseEvent {
1512 phase: "validate",
1513 binary_path: bin.to_str(),
1514 version: Some(&version),
1515 items_total: None,
1516 items_pending: None,
1517 llm_parallelism: None,
1518 });
1519 bin
1520 }
1521 EnrichMode::Codex => {
1522 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1523 emit_json(&PhaseEvent {
1524 phase: "validate",
1525 binary_path: bin.to_str(),
1526 version: None,
1527 items_total: None,
1528 items_pending: None,
1529 llm_parallelism: None,
1530 });
1531 bin
1532 }
1533 EnrichMode::Opencode => {
1534 let bin = super::opencode_runner::find_opencode_binary_with_override(
1535 args.opencode_binary.as_deref(),
1536 )?;
1537 emit_json(&PhaseEvent {
1538 phase: "validate",
1539 binary_path: bin.to_str(),
1540 version: None,
1541 items_total: None,
1542 items_pending: None,
1543 llm_parallelism: None,
1544 });
1545 bin
1546 }
1547 EnrichMode::OpenRouter => {
1548 emit_json(&PhaseEvent {
1553 phase: "validate",
1554 binary_path: None,
1555 version: None,
1556 items_total: None,
1557 items_pending: None,
1558 llm_parallelism: None,
1559 });
1560 PathBuf::new()
1561 }
1562 })
1563 };
1564
1565 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1569 let load = crate::system_load::load_average_one();
1570 let n = crate::system_load::ncpus();
1571 return Err(AppError::Validation(format!(
1572 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1573 pass --no-max-load-check to override (not recommended)"
1574 )));
1575 }
1576
1577 if args.preflight_check
1584 && !args.dry_run
1585 && !matches!(args.operation(), EnrichOperation::ReEmbed)
1586 {
1587 let preflight_result = run_preflight_probe(args);
1588 match preflight_result {
1589 PreflightOutcome::Healthy => {
1590 tracing::info!(target: "enrich", mode = ?args.mode(), "preflight probe healthy");
1591 }
1592 PreflightOutcome::RateLimited { reason, suggestion } => {
1593 if let Some(fallback) = args.fallback_mode.clone() {
1594 if fallback != args.mode() {
1595 return Err(AppError::Validation(format!(
1605 "preflight detected rate limit on {mode:?}: {reason}; \
1606 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1607 mode = args.mode()
1608 )));
1609 }
1610 return Err(AppError::Validation(format!(
1611 "preflight detected rate limit on {mode:?}: {reason}; \
1612 --fallback-mode matches --mode, no recovery possible",
1613 mode = args.mode()
1614 )));
1615 }
1616 return Err(AppError::Validation(format!(
1617 "preflight detected rate limit on {mode:?}: {reason}; \
1618 {suggestion}; pass --fallback-mode codex to recover",
1619 mode = args.mode()
1620 )));
1621 }
1622 PreflightOutcome::Error(e) => {
1623 return Err(e);
1624 }
1625 }
1626 }
1627
1628 let mut scan_result = scan_operation(&conn, &namespace, args)?;
1630 if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1639 let q_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1640 if let Ok(q) = open_queue_db(&q_path) {
1641 if let Ok(vetoed) = skipped_item_keys(&q, &format!("{:?}", args.operation())) {
1642 scan_result.retain(|k| !vetoed.contains(k));
1643 }
1644 }
1645 }
1646 let total = scan_result.len();
1647
1648 emit_json(&PhaseEvent {
1649 phase: "scan",
1650 binary_path: None,
1651 version: None,
1652 items_total: Some(total),
1653 items_pending: Some(total),
1654 llm_parallelism: Some(args.llm_parallelism),
1655 });
1656
1657 if args.dry_run {
1659 for (idx, key) in scan_result.iter().enumerate() {
1660 emit_json(&ItemEvent {
1661 item: key,
1662 status: "preview",
1663 memory_id: None,
1664 entity_id: None,
1665 entities: None,
1666 rels: None,
1667 chars_before: None,
1668 chars_after: None,
1669 cost_usd: None,
1670 elapsed_ms: None,
1671 error: None,
1672 index: idx,
1673 total,
1674 });
1675 }
1676 emit_json(&EnrichSummary {
1677 summary: true,
1678 operation: format!("{:?}", args.operation()),
1679 items_total: total,
1680 completed: 0,
1681 failed: 0,
1682 skipped: 0,
1683 cost_usd: 0.0,
1684 elapsed_ms: started.elapsed().as_millis() as u64,
1685 backend_invoked: take_enrich_backend(),
1686 waiting: 0,
1687 dead: 0,
1688 });
1689 return Ok(());
1690 }
1691
1692 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1696 let queue_conn = open_queue_db(&queue_path)?;
1697
1698 if args.resume {
1699 let reset = queue_conn
1700 .execute(
1701 "UPDATE queue SET status='pending' WHERE status='processing'",
1702 [],
1703 )
1704 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1705 if reset > 0 {
1706 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1707 }
1708 }
1709
1710 if args.retry_failed {
1711 let count = queue_conn
1712 .execute(
1713 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1714 [],
1715 )
1716 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1717 tracing::info!(target: "enrich", count, "retrying failed items");
1718 }
1719
1720 if !args.resume && !args.retry_failed && !args.until_empty {
1721 queue_conn
1722 .execute("DELETE FROM queue", [])
1723 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1724 }
1725
1726 let op_label = format!("{:?}", args.operation());
1728 let item_type = item_type_for(&args.operation());
1729 for key in scan_result.iter() {
1730 enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1731 }
1732
1733 let parallelism = if args.mode() == EnrichMode::OpenRouter {
1736 let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
1737 tracing::info!(
1738 target: "enrich",
1739 concurrency = rest,
1740 source = "rest_concurrency",
1741 "OpenRouter REST concurrency (clamp 1..=16)"
1742 );
1743 rest
1744 } else {
1745 let p = args.llm_parallelism.clamp(1, 32) as usize;
1746 tracing::info!(
1747 target: "enrich",
1748 concurrency = p,
1749 source = "llm_parallelism",
1750 "LLM subprocess parallelism (clamp 1..=32)"
1751 );
1752 p
1753 };
1754 if parallelism > 1 {
1755 tracing::info!(
1756 target: "enrich",
1757 llm_parallelism = parallelism,
1758 "parallel LLM processing with bounded thread pool"
1759 );
1760 }
1761 if parallelism > 4 {
1765 match args.mode() {
1766 EnrichMode::ClaudeCode => {
1767 tracing::warn!(
1768 target: "enrich",
1769 llm_parallelism = parallelism,
1770 recommended_max = 4,
1771 mode = "claude-code",
1772 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1773 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1774 to cut MCP children (G28-A)"
1775 );
1776 }
1777 EnrichMode::Codex if parallelism > 16 => {
1778 tracing::warn!(
1779 target: "enrich",
1780 llm_parallelism = parallelism,
1781 recommended_max = 16,
1782 mode = "codex",
1783 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1784 consider --llm-parallelism 8 for safer concurrency"
1785 );
1786 }
1787 EnrichMode::Codex => {
1788 }
1792 EnrichMode::Opencode if parallelism > 16 => {
1793 tracing::warn!(
1794 target: "enrich",
1795 llm_parallelism = parallelism,
1796 recommended_max = 16,
1797 mode = "opencode",
1798 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1799 consider --llm-parallelism 8 for safer concurrency"
1800 );
1801 }
1802 EnrichMode::Opencode => {
1803 }
1805 EnrichMode::OpenRouter => {
1806 }
1809 }
1810 }
1811
1812 let mut completed = 0usize;
1813 let mut failed = 0usize;
1814 let mut skipped = 0usize;
1815 let mut cost_total = 0.0f64;
1816 let mut oauth_detected = false;
1817 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1818 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1819 let enrich_started = std::time::Instant::now();
1820
1821 let provider_timeout = match args.mode() {
1822 EnrichMode::ClaudeCode => args.claude_timeout,
1823 EnrichMode::Codex => args.codex_timeout,
1824 EnrichMode::Opencode => args.opencode_timeout,
1825 EnrichMode::OpenRouter => args.openrouter_timeout,
1826 };
1827
1828 let provider_model: Option<&str> = match args.mode() {
1829 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1830 EnrichMode::Codex => args.codex_model.as_deref(),
1831 EnrichMode::Opencode => args.opencode_model.as_deref(),
1832 EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
1833 };
1834
1835 let backoff_clause: &str = if args.ignore_backoff {
1839 ""
1840 } else {
1841 "AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))"
1842 };
1843
1844 emit_json(&ConcurrencyEvent {
1847 phase: "concurrency",
1848 scan_parallelism: 1,
1849 drain_parallelism: parallelism as u32,
1850 });
1851
1852 let until_deadline = std::time::Instant::now()
1856 + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
1857 loop {
1858 if args.until_empty {
1859 let mut rescan = scan_operation(&conn, &namespace, args)?;
1863 if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1868 if let Ok(vetoed) = skipped_item_keys(&queue_conn, &op_label) {
1869 rescan.retain(|k| !vetoed.contains(k));
1870 }
1871 }
1872 for key in &rescan {
1873 enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1874 }
1875 }
1876 let completed_before = completed;
1877
1878 if parallelism > 1 {
1882 let stdout_mu = parking_lot::Mutex::new(());
1883 let budget = args.max_cost_usd;
1884 let operation = args.operation().clone();
1885 let mode = args.mode().clone();
1886 let min_oc = args.min_output_chars;
1887 let max_oc = args.max_output_chars;
1888 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1889
1890 struct WorkerResult {
1891 completed: usize,
1892 failed: usize,
1893 skipped: usize,
1894 cost: f64,
1895 oauth: bool,
1896 db_busy: bool,
1901 }
1902
1903 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1904 let handles: Vec<_> = (0..parallelism)
1905 .map(|worker_id| {
1906 let stdout_mu = &stdout_mu;
1907 let paths = &paths;
1908 let queue_path = &queue_path;
1909 let namespace = &namespace;
1910 let provider_binary = provider_binary.as_deref();
1911 let operation = &operation;
1912 let mode = &mode;
1913 let prompt_tpl = prompt_tpl.as_deref();
1914 s.spawn(move || {
1915 let w_conn = match open_rw(&paths.db) {
1916 Ok(c) => c,
1917 Err(e) => {
1918 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1919 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1920 }
1921 };
1922 let w_queue = match open_queue_db(queue_path) {
1923 Ok(c) => c,
1924 Err(e) => {
1925 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1926 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false, db_busy: false };
1927 }
1928 };
1929 let mut w_completed = 0usize;
1930 let mut w_failed = 0usize;
1931 let mut w_skipped = 0usize;
1932 let mut w_cost = 0.0f64;
1933 let mut w_oauth = false;
1934 let mut w_db_busy = false;
1935 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1936 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1937 let mut w_breaker = crate::retry::CircuitBreaker::new(
1943 args.circuit_breaker_threshold.max(1),
1944 std::time::Duration::from_secs(60),
1945 );
1946
1947 loop {
1948 if crate::shutdown_requested() {
1949 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1950 break;
1951 }
1952 if let Some(b) = budget {
1953 if !w_oauth && w_cost >= b {
1954 break;
1955 }
1956 }
1957 let pending = match crate::storage::utils::with_busy_retry(|| {
1976 dequeue_next_pending(&w_queue, backoff_clause)
1977 }) {
1978 Ok(DequeueOutcome::Claimed(p)) => Some(p),
1979 Ok(DequeueOutcome::Empty) => None,
1980 Err(AppError::DbBusy(msg)) => {
1981 tracing::error!(target: "enrich", worker = worker_id, error = %msg, "SQLITE_BUSY exhausted bounded retries, worker aborting");
1982 w_db_busy = true;
1983 None
1984 }
1985 Err(e) => {
1986 tracing::error!(target: "enrich", worker = worker_id, error = %e, "dequeue failed");
1987 None
1988 }
1989 };
1990 let (queue_id, item_key, _item_type, attempt_current) = match pending {
1991 Some(p) => p,
1992 None => break,
1993 };
1994 let item_started = Instant::now();
1995 let current_index = w_completed + w_failed + w_skipped;
1996
1997 let provider_bin = provider_binary.unwrap_or_else(|| std::path::Path::new(""));
2003 let call_result = match operation {
2004 EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2005 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2006 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2007 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2008 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2009 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2010 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2011 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2012 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2013 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2014 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2015 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
2016 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, args.body_extract_graph_only),
2017 };
2018 let openrouter_diag = take_last_openrouter_failure();
2024
2025 match call_result {
2026 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2027 if is_oauth { w_oauth = true; }
2028 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2029 let _ = w_queue.execute(
2030 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2031 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2032 );
2033 w_completed += 1;
2034 if !is_oauth { w_cost += cost; }
2035 let _ = w_breaker
2037 .record(crate::retry::AttemptOutcome::Success);
2038 let _guard = stdout_mu.lock();
2039 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2040 }
2041 Ok(EnrichItemResult::Skipped { reason }) => {
2042 w_skipped += 1;
2043 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2044 let _guard = stdout_mu.lock();
2045 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2046 }
2047 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2048 w_skipped += 1;
2054 let reason = format!(
2055 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2056 );
2057 let _ = w_queue.execute(
2058 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2059 rusqlite::params![reason, queue_id],
2060 );
2061 let _guard = stdout_mu.lock();
2062 emit_json(&ItemEvent {
2063 item: &item_key,
2064 status: "preservation_failed",
2065 memory_id: None,
2066 entity_id: None,
2067 entities: None,
2068 rels: None,
2069 chars_before: Some(chars_before),
2070 chars_after: Some(chars_after),
2071 cost_usd: None,
2072 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2073 error: Some(reason),
2074 index: current_index,
2075 total,
2076 });
2077 }
2078 Err(e) => {
2079 let err_str = format!("{e}");
2080 if matches!(e, AppError::RateLimited { .. }) {
2081 if crate::retry::is_kill_switch_active() {
2082 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2083 } else if std::time::Instant::now() >= w_deadline {
2084 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2085 } else {
2086 let half = w_backoff / 2;
2087 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2088 let actual_wait = half + jitter;
2089 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2090 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2091 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2092 w_backoff = (w_backoff * 2).min(900);
2093 continue;
2094 }
2095 }
2096 w_failed += 1;
2097 let outcome = match openrouter_diag {
2104 Some(diag) => record_item_failure_typed(
2105 &w_queue,
2106 queue_id,
2107 attempt_current,
2108 args.max_attempts,
2109 diag.retry_class,
2110 &err_str,
2111 diag.finish_reason.as_deref(),
2112 diag.prompt_tokens,
2113 diag.completion_tokens,
2114 ),
2115 None => record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e),
2116 };
2117 let _guard = stdout_mu.lock();
2118 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2119 let breaker_opened = w_breaker.record(outcome);
2122 if breaker_opened {
2123 tracing::error!(target: "enrich",
2124 consecutive_failures = w_breaker.consecutive_failures(),
2125 "circuit breaker opened — aborting worker"
2126 );
2127 break;
2128 }
2129 }
2130 }
2131 }
2132 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth, db_busy: w_db_busy }
2133 })
2134 })
2135 .collect();
2136 handles
2137 .into_iter()
2138 .map(|h| {
2139 h.join().unwrap_or(WorkerResult {
2140 completed: 0,
2141 failed: 0,
2142 skipped: 0,
2143 cost: 0.0,
2144 oauth: false,
2145 db_busy: false,
2146 })
2147 })
2148 .collect()
2149 });
2150
2151 if results.iter().any(|r| r.db_busy) {
2157 return Err(AppError::DbBusy(
2158 "SQLITE_BUSY exhausted bounded retries while dequeuing (parallel worker)"
2159 .into(),
2160 ));
2161 }
2162
2163 for r in &results {
2164 completed += r.completed;
2165 failed += r.failed;
2166 skipped += r.skipped;
2167 cost_total += r.cost;
2168 if r.oauth && !oauth_detected {
2169 oauth_detected = true;
2170 }
2171 }
2172 } else {
2173 loop {
2175 if crate::shutdown_requested() {
2176 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2177 break;
2178 }
2179
2180 if let Some(budget) = args.max_cost_usd {
2182 if !oauth_detected && cost_total >= budget {
2183 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2184 break;
2185 }
2186 }
2187
2188 let pending = match crate::storage::utils::with_busy_retry(|| {
2204 dequeue_next_pending(&queue_conn, backoff_clause)
2205 }) {
2206 Ok(DequeueOutcome::Claimed(p)) => Some(p),
2207 Ok(DequeueOutcome::Empty) => None,
2208 Err(e @ AppError::DbBusy(_)) => {
2209 tracing::error!(target: "enrich", error = %e, "SQLITE_BUSY exhausted bounded retries, aborting drain loop");
2210 return Err(e);
2211 }
2212 Err(e) => {
2213 tracing::error!(target: "enrich", error = %e, "dequeue failed");
2214 None
2215 }
2216 };
2217
2218 let (queue_id, item_key, item_type, attempt_current) = match pending {
2219 Some(p) => p,
2220 None => break,
2221 };
2222
2223 let item_started = Instant::now();
2224 let current_index = completed + failed + skipped;
2225
2226 let provider_bin = provider_binary
2229 .as_deref()
2230 .unwrap_or_else(|| std::path::Path::new(""));
2231 let call_result = match args.operation() {
2232 EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => {
2233 call_memory_bindings(
2234 &conn,
2235 &namespace,
2236 &item_key,
2237 provider_bin,
2238 provider_model,
2239 provider_timeout,
2240 &args.mode(),
2241 )
2242 }
2243 EnrichOperation::EntityDescriptions => call_entity_description(
2244 &conn,
2245 &namespace,
2246 &item_key,
2247 provider_bin,
2248 provider_model,
2249 provider_timeout,
2250 &args.mode(),
2251 ),
2252 EnrichOperation::BodyEnrich => call_body_enrich(
2253 &conn,
2254 &namespace,
2255 &item_key,
2256 provider_bin,
2257 provider_model,
2258 provider_timeout,
2259 &args.mode(),
2260 args.min_output_chars,
2261 args.max_output_chars,
2262 args.prompt_template.as_deref(),
2263 args.preserve_threshold,
2264 &paths,
2265 llm_backend,
2266 embedding_backend,
2267 ),
2268 EnrichOperation::ReEmbed => call_reembed(
2269 &conn,
2270 &namespace,
2271 &item_key,
2272 &paths,
2273 llm_backend,
2274 embedding_backend,
2275 ),
2276 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2277 &conn,
2278 &namespace,
2279 &item_key,
2280 provider_bin,
2281 provider_model,
2282 provider_timeout,
2283 &args.mode(),
2284 ),
2285 EnrichOperation::RelationReclassify => call_relation_reclassify(
2286 &conn,
2287 &namespace,
2288 &item_key,
2289 provider_bin,
2290 provider_model,
2291 provider_timeout,
2292 &args.mode(),
2293 ),
2294 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2295 call_entity_connect(
2296 &conn,
2297 &namespace,
2298 &item_key,
2299 provider_bin,
2300 provider_model,
2301 provider_timeout,
2302 &args.mode(),
2303 )
2304 }
2305 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2306 &conn,
2307 &namespace,
2308 &item_key,
2309 provider_bin,
2310 provider_model,
2311 provider_timeout,
2312 &args.mode(),
2313 ),
2314 EnrichOperation::DescriptionEnrich => call_description_enrich(
2315 &conn,
2316 &namespace,
2317 &item_key,
2318 provider_bin,
2319 provider_model,
2320 provider_timeout,
2321 &args.mode(),
2322 ),
2323 EnrichOperation::DomainClassify => call_domain_classify(
2324 &conn,
2325 &namespace,
2326 &item_key,
2327 provider_bin,
2328 provider_model,
2329 provider_timeout,
2330 &args.mode(),
2331 ),
2332 EnrichOperation::GraphAudit => call_graph_audit(
2333 &conn,
2334 &namespace,
2335 &item_key,
2336 provider_bin,
2337 provider_model,
2338 provider_timeout,
2339 &args.mode(),
2340 ),
2341 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2342 &conn,
2343 &namespace,
2344 &item_key,
2345 provider_bin,
2346 provider_model,
2347 provider_timeout,
2348 &args.mode(),
2349 ),
2350 EnrichOperation::BodyExtract => call_body_extract(
2351 &conn,
2352 &namespace,
2353 &item_key,
2354 provider_bin,
2355 provider_model,
2356 provider_timeout,
2357 &args.mode(),
2358 args.body_extract_graph_only,
2359 ),
2360 };
2361 let openrouter_diag = take_last_openrouter_failure();
2364
2365 match call_result {
2366 Ok(EnrichItemResult::Done {
2367 memory_id,
2368 entity_id,
2369 entities,
2370 rels,
2371 chars_before,
2372 chars_after,
2373 cost,
2374 is_oauth,
2375 }) => {
2376 if is_oauth && !oauth_detected {
2377 oauth_detected = true;
2378 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2379 }
2380 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2381
2382 let persist_err: Option<String> = match args.operation() {
2384 EnrichOperation::MemoryBindings => {
2385 None
2387 }
2388 EnrichOperation::EntityDescriptions => {
2389 None
2391 }
2392 EnrichOperation::BodyEnrich => {
2393 None
2395 }
2396 _ => {
2397 None
2399 }
2400 };
2401
2402 if let Err(e) = queue_conn.execute(
2403 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2404 rusqlite::params![
2405 memory_id,
2406 entity_id,
2407 entities as i64,
2408 rels as i64,
2409 cost,
2410 item_started.elapsed().as_millis() as i64,
2411 queue_id
2412 ],
2413 ) {
2414 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2415 }
2416
2417 if persist_err.is_none() {
2418 completed += 1;
2419 if !is_oauth {
2420 cost_total += cost;
2421 }
2422 emit_json(&ItemEvent {
2423 item: &item_key,
2424 status: "done",
2425 memory_id,
2426 entity_id,
2427 entities: Some(entities),
2428 rels: Some(rels),
2429 chars_before,
2430 chars_after,
2431 cost_usd: if is_oauth { None } else { Some(cost) },
2432 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2433 error: None,
2434 index: current_index,
2435 total,
2436 });
2437 } else {
2438 failed += 1;
2439 emit_json(&ItemEvent {
2440 item: &item_key,
2441 status: "failed",
2442 memory_id: None,
2443 entity_id: None,
2444 entities: None,
2445 rels: None,
2446 chars_before: None,
2447 chars_after: None,
2448 cost_usd: None,
2449 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2450 error: persist_err,
2451 index: current_index,
2452 total,
2453 });
2454 }
2455 }
2456 Ok(EnrichItemResult::Skipped { reason }) => {
2457 skipped += 1;
2458 if let Err(e) = queue_conn.execute(
2459 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2460 rusqlite::params![reason, queue_id],
2461 ) {
2462 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2463 }
2464 emit_json(&ItemEvent {
2465 item: &item_key,
2466 status: "skipped",
2467 memory_id: None,
2468 entity_id: None,
2469 entities: None,
2470 rels: None,
2471 chars_before: None,
2472 chars_after: None,
2473 cost_usd: None,
2474 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2475 error: None,
2476 index: current_index,
2477 total,
2478 });
2479 }
2480 Ok(EnrichItemResult::PreservationFailed {
2481 score,
2482 threshold,
2483 chars_before,
2484 chars_after,
2485 }) => {
2486 skipped += 1;
2493 let reason = format!(
2494 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2495 );
2496 if let Err(qe) = queue_conn.execute(
2497 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2498 rusqlite::params![reason, queue_id],
2499 ) {
2500 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2501 }
2502 emit_json(&ItemEvent {
2503 item: &item_key,
2504 status: "preservation_failed",
2505 memory_id: None,
2506 entity_id: None,
2507 entities: None,
2508 rels: None,
2509 chars_before: Some(chars_before),
2510 chars_after: Some(chars_after),
2511 cost_usd: None,
2512 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2513 error: Some(reason),
2514 index: current_index,
2515 total,
2516 });
2517 }
2518 Err(e) => {
2519 let err_str = format!("{e}");
2520 if matches!(e, AppError::RateLimited { .. }) {
2521 if crate::retry::is_kill_switch_active() {
2522 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2523 } else if std::time::Instant::now() >= rate_limit_deadline {
2524 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2525 } else {
2526 let half = backoff_secs / 2;
2527 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2528 let actual_wait = half + jitter;
2529 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2530 if let Err(qe) = queue_conn.execute(
2531 "UPDATE queue SET status='pending' WHERE id=?1",
2532 rusqlite::params![queue_id],
2533 ) {
2534 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2535 }
2536 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2537 backoff_secs = (backoff_secs * 2).min(900);
2538 continue;
2539 }
2540 }
2541
2542 failed += 1;
2543 let _outcome = match openrouter_diag {
2548 Some(diag) => record_item_failure_typed(
2549 &queue_conn,
2550 queue_id,
2551 attempt_current,
2552 args.max_attempts,
2553 diag.retry_class,
2554 &err_str,
2555 diag.finish_reason.as_deref(),
2556 diag.prompt_tokens,
2557 diag.completion_tokens,
2558 ),
2559 None => record_item_failure(
2560 &queue_conn,
2561 queue_id,
2562 attempt_current,
2563 args.max_attempts,
2564 &e,
2565 ),
2566 };
2567 emit_json(&ItemEvent {
2568 item: &item_key,
2569 status: "failed",
2570 memory_id: None,
2571 entity_id: None,
2572 entities: None,
2573 rels: None,
2574 chars_before: None,
2575 chars_after: None,
2576 cost_usd: None,
2577 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2578 error: Some(err_str),
2579 index: current_index,
2580 total,
2581 });
2582 }
2583 }
2584
2585 let _ = item_type; }
2587 } if !args.until_empty {
2590 break;
2591 }
2592 let eligible_remaining: i64 = queue_conn
2593 .query_row(
2594 &format!("SELECT COUNT(*) FROM queue WHERE status='pending' {backoff_clause}"),
2595 [],
2596 |r| r.get(0),
2597 )
2598 .unwrap_or(0);
2599 let progressed = completed > completed_before;
2600 if std::time::Instant::now() >= until_deadline {
2601 tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
2602 break;
2603 }
2604 if !progressed && eligible_remaining == 0 {
2605 tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
2606 break;
2607 }
2608 if eligible_remaining == 0 {
2609 std::thread::sleep(std::time::Duration::from_secs(1));
2611 }
2612 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2615 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2616
2617 let waiting_final: i64 = queue_conn
2621 .query_row(
2622 "SELECT COUNT(*) FROM queue WHERE status='pending' \
2623 AND (operation = ?1 OR operation IS NULL) \
2624 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
2625 rusqlite::params![op_label],
2626 |r| r.get(0),
2627 )
2628 .unwrap_or(0);
2629 let dead_final: i64 = queue_conn
2630 .query_row(
2631 "SELECT COUNT(*) FROM queue WHERE status='dead' \
2632 AND (operation = ?1 OR operation IS NULL)",
2633 rusqlite::params![op_label],
2634 |r| r.get(0),
2635 )
2636 .unwrap_or(0);
2637
2638 emit_json(&EnrichSummary {
2639 summary: true,
2640 operation: format!("{:?}", args.operation()),
2641 items_total: total,
2642 completed,
2643 failed,
2644 skipped,
2645 cost_usd: cost_total,
2646 elapsed_ms: started.elapsed().as_millis() as u64,
2647 backend_invoked: take_enrich_backend(),
2648 waiting: waiting_final,
2649 dead: dead_final,
2650 });
2651
2652 if failed == 0 {
2653 let dead: i64 = queue_conn
2656 .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
2657 r.get(0)
2658 })
2659 .unwrap_or(0);
2660 let skipped_remaining: i64 = queue_conn
2666 .query_row(
2667 "SELECT COUNT(*) FROM queue WHERE status='skipped'",
2668 [],
2669 |r| r.get(0),
2670 )
2671 .unwrap_or(0);
2672 if dead == 0 && skipped_remaining == 0 {
2673 let _ = std::fs::remove_file(&queue_path);
2674 }
2675 }
2676
2677 Ok(())
2678}
2679
2680#[cfg(test)]
2687mod tests {
2688 use super::*;
2689
2690 #[test]
2691 fn bindings_schema_is_valid_json() {
2692 let _: serde_json::Value =
2693 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2694 }
2695
2696 #[test]
2697 fn entity_description_schema_is_valid_json() {
2698 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2699 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2700 }
2701
2702 #[test]
2703 fn body_enrich_schema_is_valid_json() {
2704 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2705 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2706 }
2707}