1mod extraction;
26mod postprocess;
27mod queue;
28mod scan;
29use extraction::{
30 call_body_enrich, call_body_extract, call_deep_research_synth, call_description_enrich,
31 call_domain_classify, call_entity_connect, call_entity_description, call_entity_type_validate,
32 call_graph_audit, call_memory_bindings, call_reembed, call_relation_reclassify,
33 call_weight_calibrate, find_codex_binary, EnrichItemResult,
34};
35use postprocess::{
36 persist_enriched_body, persist_entity_description, persist_memory_bindings,
37 reembed_memory_vector, take_enrich_backend,
38};
39pub use queue::{cleanup_queue_entry, DeadItem, DeadSummary, EnrichStatus, WaitingItem};
40use queue::{
41 enqueue_candidate, item_type_for, open_queue_db, prune_dead_orphans, record_item_failure,
42 skipped_item_keys,
43};
44use scan::{scan_isolated_entity_pairs, scan_operation, scan_unbound_memories};
45
46use crate::commands::ingest_claude::find_claude_binary;
47use crate::constants::MAX_MEMORY_BODY_LEN;
48use crate::entity_type::EntityType;
49use crate::errors::AppError;
50use crate::paths::AppPaths;
51use crate::storage::connection::{ensure_db_ready, open_rw};
52use crate::storage::entities::{self, NewEntity, NewRelationship};
53use crate::storage::memories;
54
55use rusqlite::Connection;
56use serde::{Deserialize, Serialize};
57use std::io::Write;
58use std::path::{Path, PathBuf};
59use std::time::Instant;
60
61const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
66const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
67const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
68
69const BINDINGS_SCHEMA: &str = r#"{
74 "type": "object",
75 "properties": {
76 "entities": {
77 "type": "array",
78 "items": {
79 "type": "object",
80 "properties": {
81 "name": { "type": "string" },
82 "entity_type": {
83 "type": "string",
84 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
85 }
86 },
87 "required": ["name", "entity_type"],
88 "additionalProperties": false
89 }
90 },
91 "relationships": {
92 "type": "array",
93 "items": {
94 "type": "object",
95 "properties": {
96 "source": { "type": "string" },
97 "target": { "type": "string" },
98 "relation": {
99 "type": "string",
100 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
101 },
102 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
103 },
104 "required": ["source","target","relation","strength"],
105 "additionalProperties": false
106 }
107 }
108 },
109 "required": ["entities","relationships"],
110 "additionalProperties": false
111}"#;
112
113const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
114 "type": "object",
115 "properties": {
116 "description": { "type": "string" }
117 },
118 "required": ["description"],
119 "additionalProperties": false
120}"#;
121
122const BODY_ENRICH_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "enriched_body": { "type": "string" }
126 },
127 "required": ["enriched_body"],
128 "additionalProperties": false
129}"#;
130
131const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
133Scale:\n\
134- 0.9 = vital hard dependency (A cannot function without B)\n\
135- 0.7 = important design relationship (A strongly supports/enables B)\n\
136- 0.5 = useful contextual link (A and B share relevant context)\n\
137- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
138Respond with the calibrated weight and brief reasoning.";
139
140const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
141 "type": "object",
142 "properties": {
143 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
144 "reasoning": { "type": "string" }
145 },
146 "required": ["calibrated_weight", "reasoning"],
147 "additionalProperties": false
148}"#;
149
150const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
152Valid canonical relations (pick exactly one):\n\
153- depends-on: A cannot function without B\n\
154- uses: A utilizes B but could substitute it\n\
155- supports: A reinforces or enables B\n\
156- causes: A triggers or produces B\n\
157- fixes: A resolves a problem in B\n\
158- contradicts: A conflicts with or invalidates B\n\
159- applies-to: A is relevant to or scoped within B\n\
160- follows: A comes after B in sequence\n\
161- replaces: A substitutes B\n\
162- tracked-in: A is monitored in B\n\
163- related: A and B share context (use sparingly)\n\n\
164Respond with the correct relation, strength, and reasoning.";
165
166const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
167 "type": "object",
168 "properties": {
169 "relation": { "type": "string" },
170 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
171 "reasoning": { "type": "string" }
172 },
173 "required": ["relation", "strength", "reasoning"],
174 "additionalProperties": false
175}"#;
176
177const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
179Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
180If NO meaningful relationship exists, set relation to \"none\".\n\
181Respond with the relation (or \"none\"), strength, and reasoning.";
182
183const ENTITY_CONNECT_SCHEMA: &str = r#"{
184 "type": "object",
185 "properties": {
186 "relation": { "type": "string" },
187 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
188 "reasoning": { "type": "string" }
189 },
190 "required": ["relation", "strength", "reasoning"],
191 "additionalProperties": false
192}"#;
193
194const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
196Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
197If the current type is correct, keep it. If wrong, suggest the correct type.\n\
198Respond with the validated type and reasoning.";
199
200const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
201 "type": "object",
202 "properties": {
203 "validated_type": { "type": "string" },
204 "was_correct": { "type": "boolean" },
205 "reasoning": { "type": "string" }
206 },
207 "required": ["validated_type", "was_correct", "reasoning"],
208 "additionalProperties": false
209}"#;
210
211const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
213BAD: 'ingested from docs/auth.md'\n\
214GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
215Respond with the improved description and reasoning.";
216
217const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
218 "type": "object",
219 "properties": {
220 "description": { "type": "string" },
221 "reasoning": { "type": "string" }
222 },
223 "required": ["description", "reasoning"],
224 "additionalProperties": false
225}"#;
226
227const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
229Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
230
231const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
232 "type": "object",
233 "properties": {
234 "domain": { "type": "string" },
235 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
236 "reasoning": { "type": "string" }
237 },
238 "required": ["domain", "confidence", "reasoning"],
239 "additionalProperties": false
240}"#;
241
242const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
244Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
245Respond with a list of issues found (or empty if none) and an overall quality score.";
246
247const GRAPH_AUDIT_SCHEMA: &str = r#"{
248 "type": "object",
249 "properties": {
250 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
251 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
252 "reasoning": { "type": "string" }
253 },
254 "required": ["quality_score", "issues", "reasoning"],
255 "additionalProperties": false
256}"#;
257
258const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
260Entity names: lowercase kebab-case, domain-specific.\n\
261Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
262Respond with extracted entities, relationships, and a synthesis summary.";
263
264const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
265 "type": "object",
266 "properties": {
267 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
268 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
269 "summary": { "type": "string" }
270 },
271 "required": ["entities", "relationships", "summary"],
272 "additionalProperties": false
273}"#;
274
275const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
277Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
278Respond with the restructured body and a brief summary of changes.";
279
280const BODY_EXTRACT_SCHEMA: &str = r#"{
281 "type": "object",
282 "properties": {
283 "restructured_body": { "type": "string" },
284 "changes_summary": { "type": "string" }
285 },
286 "required": ["restructured_body", "changes_summary"],
287 "additionalProperties": false
288}"#;
289
290const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2951. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2962. Typed relationships between entities with strength scores\n\n\
297Rules:\n\
298- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
299- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
300- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
301- NEVER use 'mentions' as relationship type\n\
302- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
303- Prefer fewer high-quality entities over many low-quality ones";
304
305const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
306
307const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
308
309#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
315#[serde(rename_all = "kebab-case")]
316pub enum EnrichOperation {
317 MemoryBindings,
322 AugmentBindings,
327 EntityDescriptions,
329 BodyEnrich,
331 ReEmbed,
333 WeightCalibrate,
335 RelationReclassify,
337 EntityConnect,
339 EntityTypeValidate,
341 DescriptionEnrich,
343 CrossDomainBridges,
345 DomainClassify,
347 GraphAudit,
349 DeepResearchSynth,
351 BodyExtract,
353}
354
355#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
357pub enum EnrichMode {
358 ClaudeCode,
360 Codex,
362 #[value(name = "opencode")]
364 Opencode,
365 #[value(name = "openrouter")]
367 OpenRouter,
368}
369
370impl std::fmt::Display for EnrichMode {
371 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372 match self {
373 EnrichMode::ClaudeCode => write!(f, "claude-code"),
374 EnrichMode::Codex => write!(f, "codex"),
375 EnrichMode::Opencode => write!(f, "opencode"),
376 EnrichMode::OpenRouter => write!(f, "openrouter"),
377 }
378 }
379}
380
381#[derive(clap::Args)]
383#[command(
384 about = "Enrich graph memories and entities using an LLM provider",
385 after_long_help = "EXAMPLES:\n \
386 # Add missing entity bindings to all unbound memories\n \
387 sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n \
388 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
389 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
390 # Expand short memory bodies (GAP-18)\n \
391 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
392 # Rebuild only missing memory embeddings without rewriting bodies\n \
393 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
394 # Resume an interrupted body-enrich run\n \
395 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
396 # Retry only failed items from a previous run\n \
397 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n \
398 # Converge the whole backlog (internal scan+drain loop, no bash wrapper)\n \
399 sqlite-graphrag enrich --operation memory-bindings --mode openrouter \\\n \
400 --openrouter-model deepseek/deepseek-v4-flash:nitro --until-empty --max-runtime 600\n\n \
401 # Inspect / resurrect dead-letter items\n \
402 sqlite-graphrag enrich --operation memory-bindings --list-dead\n \
403 sqlite-graphrag enrich --operation memory-bindings --requeue-dead\n\n \
404 # Read-only status (no LLM, no singleton)\n \
405 sqlite-graphrag enrich --operation memory-bindings --status\n\n\
406 OPERATIONS NOTE:\n \
407 memory-bindings LINKS each memory to the EXISTING entities extracted from its\n \
408 body — it does not invent a new graph, it connects what is missing. It scans\n \
409 only UNBOUND memories. To re-run extraction over ALREADY-bound memories and\n \
410 MERGE newly-found entities/relationships additively (without removing links),\n \
411 use --operation augment-bindings with --names/--names-file.\n\n\
412 DEAD-LETTER SIDECAR (.enrich-queue.sqlite):\n \
413 A SQLite sidecar tracks each work item across runs. Schema (table `queue`):\n \
414 item_key (UNIQUE name/id), item_type (memory|entity), operation, memory_id,\n \
415 status (pending|processing|done|skipped|dead), attempt, error, error_class,\n \
416 next_retry_at (backoff cooldown). --until-empty loops scan→drain internally\n \
417 until eligible items are exhausted; transient failures (incl. malformed/non-\n \
418 JSON LLM output, GAP-SG-09) reschedule with backoff until --max-attempts, then\n \
419 land in status='dead'. Use --status to see the queue, --list-dead to inspect\n \
420 the sink, --requeue-dead to retry it, and --ignore-backoff to skip cooldowns.\n \
421 --names/--names-file also remedy a cooldown by targeting a specific subset.\n\n\
422 EXIT CODES:\n \
423 0 success\n \
424 1 validation error (bad args, binary not found)\n \
425 14 I/O error"
426)]
427pub struct EnrichArgs {
428 #[arg(
433 long,
434 short = 'o',
435 value_enum,
436 value_name = "OPERATION",
437 required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
438 )]
439 pub operation: Option<EnrichOperation>,
440
441 #[arg(
445 long,
446 value_enum,
447 required_unless_present_any = ["status", "list_dead", "requeue_dead", "prune_dead_orphans"]
448 )]
449 pub mode: Option<EnrichMode>,
450
451 #[arg(long, value_name = "N")]
453 pub limit: Option<usize>,
454
455 #[arg(long)]
457 pub dry_run: bool,
458
459 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
461 pub namespace: Option<String>,
462
463 #[arg(long, value_name = "PATH")]
466 pub claude_binary: Option<PathBuf>,
467
468 #[arg(long, value_name = "MODEL")]
470 pub claude_model: Option<String>,
471
472 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
474 pub claude_timeout: u64,
475
476 #[arg(long, value_name = "PATH")]
479 pub codex_binary: Option<PathBuf>,
480
481 #[arg(long, value_name = "MODEL")]
483 pub codex_model: Option<String>,
484
485 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
487 pub codex_timeout: u64,
488
489 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
492 pub opencode_binary: Option<PathBuf>,
493
494 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
496 pub opencode_model: Option<String>,
497
498 #[arg(
500 long,
501 value_name = "SECONDS",
502 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
503 default_value_t = 300
504 )]
505 pub opencode_timeout: u64,
506
507 #[arg(long, value_name = "MODEL")]
510 pub openrouter_model: Option<String>,
511
512 #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
514 pub openrouter_api_key: Option<String>,
515
516 #[arg(long, value_name = "SECONDS", default_value_t = 600)]
523 pub openrouter_timeout: u64,
524
525 #[arg(long, value_name = "URL")]
527 pub openrouter_base_url: Option<String>,
528
529 #[arg(long, value_name = "USD")]
532 pub max_cost_usd: Option<f64>,
533
534 #[arg(long)]
537 pub resume: bool,
538
539 #[arg(long)]
541 pub retry_failed: bool,
542
543 #[arg(long)]
547 pub until_empty: bool,
548
549 #[arg(long, value_name = "SECONDS")]
552 pub max_runtime: Option<u64>,
553
554 #[arg(long, value_name = "N", default_value_t = 8, value_parser = clap::value_parser!(u32).range(1..=20))]
566 pub max_attempts: u32,
567
568 #[arg(long)]
571 pub status: bool,
572
573 #[arg(long)]
578 pub list_dead: bool,
579
580 #[arg(long)]
587 pub requeue_dead: bool,
588
589 #[arg(long)]
597 pub prune_dead_orphans: bool,
598
599 #[arg(long)]
605 pub ignore_backoff: bool,
606
607 #[arg(long)]
614 pub body_extract_graph_only: bool,
615
616 #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
619 pub rest_concurrency: Option<u32>,
620
621 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
624 pub min_output_chars: usize,
625
626 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
628 pub max_output_chars: usize,
629
630 #[arg(long, default_value_t = true)]
632 pub preserve_check: bool,
633
634 #[arg(long, value_name = "PATH")]
636 pub prompt_template: Option<PathBuf>,
637
638 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
642 pub llm_parallelism: u32,
643
644 #[arg(long)]
647 pub json: bool,
648
649 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
651 pub db: Option<String>,
652
653 #[arg(long, value_name = "SECONDS")]
656 pub wait_job_singleton: Option<u64>,
657
658 #[arg(long, default_value_t = false)]
662 pub force_job_singleton: bool,
663
664 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
674 pub names: Vec<String>,
675
676 #[arg(long, value_name = "PATH")]
680 pub names_file: Option<PathBuf>,
681
682 #[arg(long, default_value_t = false)]
686 pub preflight_check: bool,
687
688 #[arg(long, value_enum)]
692 pub fallback_mode: Option<EnrichMode>,
693
694 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
697 pub rate_limit_buffer: u64,
698
699 #[arg(long, default_value_t = true)]
703 pub max_load_check: bool,
704
705 #[arg(long, value_name = "N", default_value_t = 5)]
708 pub circuit_breaker_threshold: u32,
709
710 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
717 pub preserve_threshold: f64,
718
719 #[arg(long, default_value_t = true)]
724 pub codex_model_validate: bool,
725
726 #[arg(long, value_name = "MODEL")]
731 pub codex_model_fallback: Option<String>,
732}
733
734impl EnrichArgs {
735 fn operation(&self) -> EnrichOperation {
743 self.operation
744 .clone()
745 .unwrap_or(EnrichOperation::MemoryBindings)
746 }
747
748 fn mode(&self) -> EnrichMode {
753 self.mode.clone().unwrap_or(EnrichMode::OpenRouter)
754 }
755}
756
757#[derive(Debug, Serialize)]
766struct PhaseEvent<'a> {
767 phase: &'a str,
768 #[serde(skip_serializing_if = "Option::is_none")]
769 binary_path: Option<&'a str>,
770 #[serde(skip_serializing_if = "Option::is_none")]
771 version: Option<&'a str>,
772 #[serde(skip_serializing_if = "Option::is_none")]
773 items_total: Option<usize>,
774 #[serde(skip_serializing_if = "Option::is_none")]
775 items_pending: Option<usize>,
776 #[serde(skip_serializing_if = "Option::is_none")]
778 llm_parallelism: Option<u32>,
779}
780
781#[derive(Debug, Serialize)]
786struct ConcurrencyEvent {
787 phase: &'static str,
788 scan_parallelism: u32,
789 drain_parallelism: u32,
790}
791
792#[derive(Debug, Serialize)]
793struct ItemEvent<'a> {
794 item: &'a str,
796 status: &'a str,
797 #[serde(skip_serializing_if = "Option::is_none")]
798 memory_id: Option<i64>,
799 #[serde(skip_serializing_if = "Option::is_none")]
800 entity_id: Option<i64>,
801 #[serde(skip_serializing_if = "Option::is_none")]
802 entities: Option<usize>,
803 #[serde(skip_serializing_if = "Option::is_none")]
804 rels: Option<usize>,
805 #[serde(skip_serializing_if = "Option::is_none")]
806 chars_before: Option<usize>,
807 #[serde(skip_serializing_if = "Option::is_none")]
808 chars_after: Option<usize>,
809 #[serde(skip_serializing_if = "Option::is_none")]
810 cost_usd: Option<f64>,
811 #[serde(skip_serializing_if = "Option::is_none")]
812 elapsed_ms: Option<u64>,
813 #[serde(skip_serializing_if = "Option::is_none")]
814 error: Option<String>,
815 index: usize,
816 total: usize,
817}
818
819#[derive(Debug, Serialize)]
820struct EnrichSummary {
821 summary: bool,
822 operation: String,
823 items_total: usize,
824 completed: usize,
825 failed: usize,
826 skipped: usize,
827 cost_usd: f64,
828 elapsed_ms: u64,
829 #[serde(skip_serializing_if = "Option::is_none")]
834 backend_invoked: Option<&'static str>,
835 waiting: i64,
839 dead: i64,
842}
843
844use crate::output::emit_json_line as emit_json;
845
846enum PreflightOutcome {
860 Healthy,
862 RateLimited {
866 reason: String,
867 suggestion: &'static str,
868 },
869 Error(AppError),
871}
872
873fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
881 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
882
883 match args.mode() {
884 EnrichMode::ClaudeCode => {
885 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
886 Ok(b) => b,
887 Err(e) => return PreflightOutcome::Error(e),
888 };
889 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
894 Ok(p) => p,
895 Err(e) => {
896 return PreflightOutcome::Error(AppError::Io(e));
897 }
898 };
899 let mut cmd = std::process::Command::new(&bin);
900 crate::spawn::env_whitelist::apply_env_whitelist(
901 &mut cmd,
902 crate::spawn::env_whitelist::is_strict_env_clear(),
903 );
904 if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
905 return PreflightOutcome::Error(e);
906 }
907 cmd.arg("-p")
908 .arg("ping")
909 .arg("--max-turns")
910 .arg("1")
911 .arg("--strict-mcp-config")
912 .arg("--mcp-config")
913 .arg(mcp_config_path.as_os_str())
914 .arg("--dangerously-skip-permissions")
915 .arg("--settings")
916 .arg("{\"hooks\":{}}")
917 .arg("--output-format")
918 .arg("json")
919 .stdin(std::process::Stdio::null())
920 .stdout(std::process::Stdio::piped())
921 .stderr(std::process::Stdio::piped());
922
923 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
924 Ok(c) => c,
925 Err(e) => {
926 return PreflightOutcome::Error(AppError::Io(e));
927 }
928 };
929 let output = match wait_with_timeout(child, timeout) {
930 Ok(out) => out,
931 Err(e) => return PreflightOutcome::Error(e),
932 };
933 if !output.status.success() {
934 let stderr = String::from_utf8_lossy(&output.stderr);
935 if stderr.contains("hit your session limit")
936 || stderr.contains("rate_limit")
937 || stderr.contains("429")
938 {
939 return PreflightOutcome::RateLimited {
940 reason: stderr.trim().to_string(),
941 suggestion:
942 "wait for the OAuth window to reset or use --fallback-mode codex",
943 };
944 }
945 return PreflightOutcome::Error(AppError::Validation(format!(
946 "preflight probe failed: {stderr}",
947 stderr = stderr.trim()
948 )));
949 }
950 PreflightOutcome::Healthy
951 }
952 EnrichMode::Codex => {
953 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
954 Ok(b) => b,
955 Err(e) => return PreflightOutcome::Error(e),
956 };
957 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
958 .map_err(PreflightOutcome::Error)
959 .ok();
960 let schema = "{}";
961 let schema_path = match super::codex_spawn::trusted_schema_path() {
962 Ok(p) => p,
963 Err(e) => return PreflightOutcome::Error(e),
964 };
965 let spawn_args = super::codex_spawn::CodexSpawnArgs {
966 binary: &bin,
967 prompt: "ping",
968 json_schema: schema,
969 input_text: "",
970 model: args.codex_model.as_deref(),
971 timeout_secs: args.rate_limit_buffer.max(60),
972 schema_path: schema_path.clone(),
973 };
974 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
975 Ok(c) => c,
976 Err(e) => return PreflightOutcome::Error(e),
977 };
978 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
979 Ok(c) => c,
980 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
981 };
982 let output = match wait_with_timeout(child, timeout) {
983 Ok(out) => out,
984 Err(e) => return PreflightOutcome::Error(e),
985 };
986 let _ = std::fs::remove_file(&schema_path);
987 if !output.status.success() {
988 let stderr = String::from_utf8_lossy(&output.stderr);
989 if stderr.contains("rate_limit")
990 || stderr.contains("429")
991 || stderr.contains("Too Many Requests")
992 {
993 return PreflightOutcome::RateLimited {
994 reason: stderr.trim().to_string(),
995 suggestion: "wait for the rate-limit window to reset",
996 };
997 }
998 return PreflightOutcome::Error(AppError::Validation(format!(
999 "preflight probe failed: {stderr}",
1000 stderr = stderr.trim()
1001 )));
1002 }
1003 PreflightOutcome::Healthy
1004 }
1005 EnrichMode::Opencode => {
1006 let bin = match super::opencode_runner::find_opencode_binary_with_override(
1007 args.opencode_binary.as_deref(),
1008 ) {
1009 Ok(b) => b,
1010 Err(e) => return PreflightOutcome::Error(e),
1011 };
1012 let model =
1013 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1014 let mut cmd =
1015 match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1016 {
1017 Ok(c) => c,
1018 Err(e) => return PreflightOutcome::Error(e),
1019 };
1020 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1021 Ok(c) => c,
1022 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1023 };
1024 let output = match wait_with_timeout(child, timeout) {
1025 Ok(out) => out,
1026 Err(e) => return PreflightOutcome::Error(e),
1027 };
1028 if !output.status.success() {
1029 let stderr = String::from_utf8_lossy(&output.stderr);
1030 if stderr.contains("rate_limit")
1031 || stderr.contains("429")
1032 || stderr.contains("Too Many Requests")
1033 {
1034 return PreflightOutcome::RateLimited {
1035 reason: stderr.trim().to_string(),
1036 suggestion: "wait for the rate-limit window to reset",
1037 };
1038 }
1039 return PreflightOutcome::Error(AppError::Validation(format!(
1040 "preflight probe failed: {stderr}",
1041 stderr = stderr.trim()
1042 )));
1043 }
1044 PreflightOutcome::Healthy
1045 }
1046 EnrichMode::OpenRouter => {
1047 match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1051 Some(_) => PreflightOutcome::Healthy,
1052 None => PreflightOutcome::Error(AppError::Validation(
1053 "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1054 )),
1055 }
1056 }
1057 }
1058}
1059
1060fn wait_with_timeout(
1062 mut child: std::process::Child,
1063 timeout: std::time::Duration,
1064) -> Result<std::process::Output, AppError> {
1065 use wait_timeout::ChildExt;
1066 let start = std::time::Instant::now();
1067 let Some(exit) = child.wait_timeout(timeout).map_err(AppError::Io)? else {
1068 let _ = child.kill();
1069 let _ = child.wait();
1070 return Err(AppError::Validation(format!(
1071 "preflight probe timed out after {}s",
1072 start.elapsed().as_secs()
1073 )));
1074 };
1075 let mut stdout = Vec::new();
1076 if let Some(mut out) = child.stdout.take() {
1077 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1078 }
1079 let mut stderr = Vec::new();
1080 if let Some(mut err) = child.stderr.take() {
1081 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1082 }
1083 Ok(std::process::Output {
1084 status: exit,
1085 stdout,
1086 stderr,
1087 })
1088}
1089
1090fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1106 value == default
1107}
1108
1109fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1124 const DEFAULT_TIMEOUT: u64 = 300;
1125
1126 let mut conflicts: Vec<String> = Vec::new();
1127
1128 match args.mode() {
1129 EnrichMode::ClaudeCode => {
1130 if args.codex_binary.is_some() {
1131 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1132 }
1133 if args.codex_model.is_some() {
1134 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1135 }
1136 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1137 conflicts.push(format!(
1138 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1139 args.codex_timeout
1140 ));
1141 }
1142 }
1143 EnrichMode::Codex => {
1144 if args.claude_binary.is_some() {
1145 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1146 }
1147 if args.claude_model.is_some() {
1148 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1149 }
1150 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1151 conflicts.push(format!(
1152 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1153 args.claude_timeout
1154 ));
1155 }
1156 if args.max_cost_usd.is_some() {
1157 conflicts.push(
1158 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1159 .to_string(),
1160 );
1161 }
1162 }
1163 EnrichMode::Opencode => {
1164 if args.claude_binary.is_some() {
1165 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1166 }
1167 if args.claude_model.is_some() {
1168 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1169 }
1170 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1171 conflicts.push(format!(
1172 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1173 args.claude_timeout
1174 ));
1175 }
1176 if args.max_cost_usd.is_some() {
1177 conflicts.push(
1178 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1179 .to_string(),
1180 );
1181 }
1182 }
1183 EnrichMode::OpenRouter => {
1184 if args.claude_binary.is_some() {
1185 conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1186 }
1187 if args.claude_model.is_some() {
1188 conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1189 }
1190 if args.codex_binary.is_some() {
1191 conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1192 }
1193 if args.codex_model.is_some() {
1194 conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1195 }
1196 if args.opencode_binary.is_some() {
1197 conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1198 }
1199 if args.opencode_model.is_some() {
1200 conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1201 }
1202 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1203 conflicts.push(format!(
1204 "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1205 args.claude_timeout
1206 ));
1207 }
1208 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1209 conflicts.push(format!(
1210 "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1211 args.codex_timeout
1212 ));
1213 }
1214 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1215 conflicts.push(format!(
1216 "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1217 args.opencode_timeout
1218 ));
1219 }
1220 }
1221 }
1222
1223 if !conflicts.is_empty() {
1224 return Err(AppError::Validation(format!(
1225 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1226 args.mode(),
1227 conflicts.join("\n - ")
1228 )));
1229 }
1230
1231 Ok(())
1232}
1233
1234pub fn run(
1238 args: &EnrichArgs,
1239 llm_backend: crate::cli::LlmBackendChoice,
1240 embedding_backend: crate::cli::EmbeddingBackendChoice,
1241) -> Result<(), AppError> {
1242 validate_mode_conditional_flags_enrich(args)?;
1245
1246 if args.list_dead || args.requeue_dead || args.prune_dead_orphans {
1255 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1256 let op_label = format!("{:?}", args.operation());
1257 let paths = AppPaths::resolve(args.db.as_deref())?;
1258 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1259 let queue_conn = open_queue_db(&queue_path)?;
1260 if args.prune_dead_orphans {
1263 ensure_db_ready(&paths)?;
1264 let main_conn = open_rw(&paths.db)?;
1265 let pruned = prune_dead_orphans(&queue_conn, &main_conn, &op_label, &namespace)?;
1266 let dead_total: i64 = queue_conn
1267 .query_row(
1268 "SELECT COUNT(*) FROM queue WHERE status='dead' \
1269 AND (operation = ?1 OR operation IS NULL)",
1270 rusqlite::params![op_label],
1271 |r| r.get(0),
1272 )
1273 .unwrap_or(0);
1274 emit_json(&DeadSummary {
1275 summary: true,
1276 operation: op_label,
1277 namespace,
1278 action: "prune-dead-orphans",
1279 dead_total,
1280 requeued: 0,
1281 pruned,
1282 });
1283 return Ok(());
1284 }
1285 if args.list_dead {
1286 let mut stmt = queue_conn.prepare(
1287 "SELECT item_key, item_type, attempt, error_class, error FROM queue \
1288 WHERE status='dead' AND (operation = ?1 OR operation IS NULL) ORDER BY id",
1289 )?;
1290 let rows = stmt
1291 .query_map(rusqlite::params![op_label], |r| {
1292 Ok(DeadItem {
1293 dead_item: true,
1294 item_key: r.get(0)?,
1295 item_type: r.get(1)?,
1296 attempt: r.get(2)?,
1297 error_class: r.get(3)?,
1298 error: r.get(4)?,
1299 })
1300 })?
1301 .collect::<Result<Vec<_>, _>>()?;
1302 let dead_total = rows.len() as i64;
1303 for item in &rows {
1304 emit_json(item);
1305 }
1306 emit_json(&DeadSummary {
1307 summary: true,
1308 operation: op_label,
1309 namespace,
1310 action: "list-dead",
1311 dead_total,
1312 requeued: 0,
1313 pruned: 0,
1314 });
1315 return Ok(());
1316 }
1317 let dead_total: i64 = queue_conn
1319 .query_row(
1320 "SELECT COUNT(*) FROM queue WHERE status='dead' \
1321 AND (operation = ?1 OR operation IS NULL)",
1322 rusqlite::params![op_label],
1323 |r| r.get(0),
1324 )
1325 .unwrap_or(0);
1326 let requeued = queue_conn
1327 .execute(
1328 "UPDATE queue SET status='pending', attempt=0, next_retry_at=NULL, \
1329 error=NULL, error_class=NULL \
1330 WHERE status='dead' AND (operation = ?1 OR operation IS NULL)",
1331 rusqlite::params![op_label],
1332 )
1333 .map_err(|e| AppError::Validation(format!("requeue-dead failed: {e}")))?
1334 as i64;
1335 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1336 emit_json(&DeadSummary {
1337 summary: true,
1338 operation: op_label,
1339 namespace,
1340 action: "requeue-dead",
1341 dead_total,
1342 requeued,
1343 pruned: 0,
1344 });
1345 return Ok(());
1346 }
1347
1348 if args.status {
1349 let paths = AppPaths::resolve(args.db.as_deref())?;
1350 ensure_db_ready(&paths)?;
1351 let conn = open_rw(&paths.db)?;
1352 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1353 let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1354 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1355 let queue_conn = open_queue_db(&queue_path)?;
1356 let op_label = format!("{:?}", args.operation());
1357 let count_status = |st: &str, op: &str| -> i64 {
1361 queue_conn
1362 .query_row(
1363 "SELECT COUNT(*) FROM queue WHERE status=?1 \
1364 AND (operation = ?2 OR operation IS NULL)",
1365 rusqlite::params![st, op],
1366 |r| r.get(0),
1367 )
1368 .unwrap_or(0)
1369 };
1370 let eligible_now: i64 = queue_conn
1371 .query_row(
1372 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1373 AND (operation = ?1 OR operation IS NULL) \
1374 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1375 rusqlite::params![op_label],
1376 |r| r.get(0),
1377 )
1378 .unwrap_or(0);
1379 let waiting: i64 = queue_conn
1380 .query_row(
1381 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1382 AND (operation = ?1 OR operation IS NULL) \
1383 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1384 rusqlite::params![op_label],
1385 |r| r.get(0),
1386 )
1387 .unwrap_or(0);
1388 let waiting_items = {
1390 let mut stmt = queue_conn.prepare(
1391 "SELECT item_key, attempt, next_retry_at, error_class FROM queue \
1392 WHERE status='pending' AND (operation = ?1 OR operation IS NULL) \
1393 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now') \
1394 ORDER BY next_retry_at",
1395 )?;
1396 let items: Vec<WaitingItem> = stmt
1397 .query_map(rusqlite::params![op_label], |r| {
1398 Ok(WaitingItem {
1399 item_key: r.get(0)?,
1400 attempt: r.get(1)?,
1401 next_retry_at: r.get(2)?,
1402 error_class: r.get(3)?,
1403 })
1404 })?
1405 .collect::<Result<Vec<_>, _>>()?;
1406 items
1407 };
1408 let queue_pending = count_status("pending", &op_label);
1409 let queue_processing = count_status("processing", &op_label);
1410 let queue_done = count_status("done", &op_label);
1411 let queue_failed = count_status("failed", &op_label);
1412 let queue_skipped = count_status("skipped", &op_label);
1413 let queue_dead = count_status("dead", &op_label);
1414 let state = if eligible_now > 0 {
1416 "draining"
1417 } else if waiting > 0 {
1418 "cooldown"
1419 } else if queue_pending == 0 && unbound_backlog > 0 {
1420 "pending-scan"
1421 } else {
1422 "empty"
1423 };
1424 emit_json(&EnrichStatus {
1425 status_report: true,
1426 operation: op_label,
1427 namespace,
1428 unbound_backlog,
1429 queue_pending,
1430 queue_processing,
1431 queue_done,
1432 queue_failed,
1433 queue_skipped,
1434 queue_dead,
1435 eligible_now,
1436 waiting,
1437 state,
1438 waiting_items,
1439 });
1440 return Ok(());
1441 }
1442
1443 if args.mode() == EnrichMode::OpenRouter {
1448 let model = args.openrouter_model.as_deref().ok_or_else(|| {
1449 AppError::Validation(
1450 "--mode openrouter requires --openrouter-model (no default model is allowed)"
1451 .into(),
1452 )
1453 })?;
1454 let resolved =
1455 crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1456 .ok_or_else(|| {
1457 AppError::Validation(
1458 "OPENROUTER_API_KEY not found; set the env var, store it via \
1459 `config add-key --provider openrouter`, or pass --openrouter-api-key"
1460 .into(),
1461 )
1462 })?;
1463 crate::embedder::get_openrouter_chat_client(
1464 resolved.value,
1465 model,
1466 args.openrouter_timeout,
1467 )?;
1468 }
1469
1470 let started = Instant::now();
1471
1472 let paths = AppPaths::resolve(args.db.as_deref())?;
1473 ensure_db_ready(&paths)?;
1474 let conn = open_rw(&paths.db)?;
1475 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1476
1477 let wait_secs = args.wait_job_singleton;
1483 let force_flag = args.force_job_singleton;
1484 let _singleton = crate::lock::acquire_job_singleton(
1485 crate::lock::JobType::Enrich,
1486 &namespace,
1487 &paths.db,
1488 wait_secs,
1489 force_flag,
1490 )?;
1491
1492 let provider_binary = if matches!(args.operation(), EnrichOperation::ReEmbed) {
1494 None
1495 } else {
1496 Some(match args.mode() {
1497 EnrichMode::ClaudeCode => {
1498 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1499 let version = super::claude_runner::validate_claude_version(&bin)?;
1500 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1501 emit_json(&PhaseEvent {
1502 phase: "validate",
1503 binary_path: bin.to_str(),
1504 version: Some(&version),
1505 items_total: None,
1506 items_pending: None,
1507 llm_parallelism: None,
1508 });
1509 bin
1510 }
1511 EnrichMode::Codex => {
1512 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1513 emit_json(&PhaseEvent {
1514 phase: "validate",
1515 binary_path: bin.to_str(),
1516 version: None,
1517 items_total: None,
1518 items_pending: None,
1519 llm_parallelism: None,
1520 });
1521 bin
1522 }
1523 EnrichMode::Opencode => {
1524 let bin = super::opencode_runner::find_opencode_binary_with_override(
1525 args.opencode_binary.as_deref(),
1526 )?;
1527 emit_json(&PhaseEvent {
1528 phase: "validate",
1529 binary_path: bin.to_str(),
1530 version: None,
1531 items_total: None,
1532 items_pending: None,
1533 llm_parallelism: None,
1534 });
1535 bin
1536 }
1537 EnrichMode::OpenRouter => {
1538 emit_json(&PhaseEvent {
1543 phase: "validate",
1544 binary_path: None,
1545 version: None,
1546 items_total: None,
1547 items_pending: None,
1548 llm_parallelism: None,
1549 });
1550 PathBuf::new()
1551 }
1552 })
1553 };
1554
1555 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1559 let load = crate::system_load::load_average_one();
1560 let n = crate::system_load::ncpus();
1561 return Err(AppError::Validation(format!(
1562 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1563 pass --no-max-load-check to override (not recommended)"
1564 )));
1565 }
1566
1567 if args.preflight_check
1574 && !args.dry_run
1575 && !matches!(args.operation(), EnrichOperation::ReEmbed)
1576 {
1577 let preflight_result = run_preflight_probe(args);
1578 match preflight_result {
1579 PreflightOutcome::Healthy => {
1580 tracing::info!(target: "enrich", mode = ?args.mode(), "preflight probe healthy");
1581 }
1582 PreflightOutcome::RateLimited { reason, suggestion } => {
1583 if let Some(fallback) = args.fallback_mode.clone() {
1584 if fallback != args.mode() {
1585 return Err(AppError::Validation(format!(
1595 "preflight detected rate limit on {mode:?}: {reason}; \
1596 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1597 mode = args.mode()
1598 )));
1599 }
1600 return Err(AppError::Validation(format!(
1601 "preflight detected rate limit on {mode:?}: {reason}; \
1602 --fallback-mode matches --mode, no recovery possible",
1603 mode = args.mode()
1604 )));
1605 }
1606 return Err(AppError::Validation(format!(
1607 "preflight detected rate limit on {mode:?}: {reason}; \
1608 {suggestion}; pass --fallback-mode codex to recover",
1609 mode = args.mode()
1610 )));
1611 }
1612 PreflightOutcome::Error(e) => {
1613 return Err(e);
1614 }
1615 }
1616 }
1617
1618 let mut scan_result = scan_operation(&conn, &namespace, args)?;
1620 if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1629 let q_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1630 if let Ok(q) = open_queue_db(&q_path) {
1631 if let Ok(vetoed) = skipped_item_keys(&q, &format!("{:?}", args.operation())) {
1632 scan_result.retain(|k| !vetoed.contains(k));
1633 }
1634 }
1635 }
1636 let total = scan_result.len();
1637
1638 emit_json(&PhaseEvent {
1639 phase: "scan",
1640 binary_path: None,
1641 version: None,
1642 items_total: Some(total),
1643 items_pending: Some(total),
1644 llm_parallelism: Some(args.llm_parallelism),
1645 });
1646
1647 if args.dry_run {
1649 for (idx, key) in scan_result.iter().enumerate() {
1650 emit_json(&ItemEvent {
1651 item: key,
1652 status: "preview",
1653 memory_id: None,
1654 entity_id: None,
1655 entities: None,
1656 rels: None,
1657 chars_before: None,
1658 chars_after: None,
1659 cost_usd: None,
1660 elapsed_ms: None,
1661 error: None,
1662 index: idx,
1663 total,
1664 });
1665 }
1666 emit_json(&EnrichSummary {
1667 summary: true,
1668 operation: format!("{:?}", args.operation()),
1669 items_total: total,
1670 completed: 0,
1671 failed: 0,
1672 skipped: 0,
1673 cost_usd: 0.0,
1674 elapsed_ms: started.elapsed().as_millis() as u64,
1675 backend_invoked: take_enrich_backend(),
1676 waiting: 0,
1677 dead: 0,
1678 });
1679 return Ok(());
1680 }
1681
1682 let queue_path = crate::paths::sidecar_path(&paths.db, ".enrich-queue.sqlite");
1686 let queue_conn = open_queue_db(&queue_path)?;
1687
1688 if args.resume {
1689 let reset = queue_conn
1690 .execute(
1691 "UPDATE queue SET status='pending' WHERE status='processing'",
1692 [],
1693 )
1694 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1695 if reset > 0 {
1696 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1697 }
1698 }
1699
1700 if args.retry_failed {
1701 let count = queue_conn
1702 .execute(
1703 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1704 [],
1705 )
1706 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1707 tracing::info!(target: "enrich", count, "retrying failed items");
1708 }
1709
1710 if !args.resume && !args.retry_failed && !args.until_empty {
1711 queue_conn
1712 .execute("DELETE FROM queue", [])
1713 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1714 }
1715
1716 let op_label = format!("{:?}", args.operation());
1718 let item_type = item_type_for(&args.operation());
1719 for key in scan_result.iter() {
1720 enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1721 }
1722
1723 let parallelism = if args.mode() == EnrichMode::OpenRouter {
1726 let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
1727 tracing::info!(
1728 target: "enrich",
1729 concurrency = rest,
1730 source = "rest_concurrency",
1731 "OpenRouter REST concurrency (clamp 1..=16)"
1732 );
1733 rest
1734 } else {
1735 let p = args.llm_parallelism.clamp(1, 32) as usize;
1736 tracing::info!(
1737 target: "enrich",
1738 concurrency = p,
1739 source = "llm_parallelism",
1740 "LLM subprocess parallelism (clamp 1..=32)"
1741 );
1742 p
1743 };
1744 if parallelism > 1 {
1745 tracing::info!(
1746 target: "enrich",
1747 llm_parallelism = parallelism,
1748 "parallel LLM processing with bounded thread pool"
1749 );
1750 }
1751 if parallelism > 4 {
1755 match args.mode() {
1756 EnrichMode::ClaudeCode => {
1757 tracing::warn!(
1758 target: "enrich",
1759 llm_parallelism = parallelism,
1760 recommended_max = 4,
1761 mode = "claude-code",
1762 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1763 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1764 to cut MCP children (G28-A)"
1765 );
1766 }
1767 EnrichMode::Codex if parallelism > 16 => {
1768 tracing::warn!(
1769 target: "enrich",
1770 llm_parallelism = parallelism,
1771 recommended_max = 16,
1772 mode = "codex",
1773 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1774 consider --llm-parallelism 8 for safer concurrency"
1775 );
1776 }
1777 EnrichMode::Codex => {
1778 }
1782 EnrichMode::Opencode if parallelism > 16 => {
1783 tracing::warn!(
1784 target: "enrich",
1785 llm_parallelism = parallelism,
1786 recommended_max = 16,
1787 mode = "opencode",
1788 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1789 consider --llm-parallelism 8 for safer concurrency"
1790 );
1791 }
1792 EnrichMode::Opencode => {
1793 }
1795 EnrichMode::OpenRouter => {
1796 }
1799 }
1800 }
1801
1802 let mut completed = 0usize;
1803 let mut failed = 0usize;
1804 let mut skipped = 0usize;
1805 let mut cost_total = 0.0f64;
1806 let mut oauth_detected = false;
1807 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1808 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1809 let enrich_started = std::time::Instant::now();
1810
1811 let provider_timeout = match args.mode() {
1812 EnrichMode::ClaudeCode => args.claude_timeout,
1813 EnrichMode::Codex => args.codex_timeout,
1814 EnrichMode::Opencode => args.opencode_timeout,
1815 EnrichMode::OpenRouter => args.openrouter_timeout,
1816 };
1817
1818 let provider_model: Option<&str> = match args.mode() {
1819 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1820 EnrichMode::Codex => args.codex_model.as_deref(),
1821 EnrichMode::Opencode => args.opencode_model.as_deref(),
1822 EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
1823 };
1824
1825 let backoff_clause: &str = if args.ignore_backoff {
1829 ""
1830 } else {
1831 "AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))"
1832 };
1833
1834 emit_json(&ConcurrencyEvent {
1837 phase: "concurrency",
1838 scan_parallelism: 1,
1839 drain_parallelism: parallelism as u32,
1840 });
1841
1842 let until_deadline = std::time::Instant::now()
1846 + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
1847 loop {
1848 if args.until_empty {
1849 let mut rescan = scan_operation(&conn, &namespace, args)?;
1853 if matches!(args.operation(), EnrichOperation::BodyEnrich) {
1858 if let Ok(vetoed) = skipped_item_keys(&queue_conn, &op_label) {
1859 rescan.retain(|k| !vetoed.contains(k));
1860 }
1861 }
1862 for key in &rescan {
1863 enqueue_candidate(&queue_conn, &conn, &namespace, key, item_type, &op_label);
1864 }
1865 }
1866 let completed_before = completed;
1867
1868 if parallelism > 1 {
1872 let stdout_mu = parking_lot::Mutex::new(());
1873 let budget = args.max_cost_usd;
1874 let operation = args.operation().clone();
1875 let mode = args.mode().clone();
1876 let min_oc = args.min_output_chars;
1877 let max_oc = args.max_output_chars;
1878 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1879
1880 struct WorkerResult {
1881 completed: usize,
1882 failed: usize,
1883 skipped: usize,
1884 cost: f64,
1885 oauth: bool,
1886 }
1887
1888 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1889 let handles: Vec<_> = (0..parallelism)
1890 .map(|worker_id| {
1891 let stdout_mu = &stdout_mu;
1892 let paths = &paths;
1893 let queue_path = &queue_path;
1894 let namespace = &namespace;
1895 let provider_binary = provider_binary.as_deref();
1896 let operation = &operation;
1897 let mode = &mode;
1898 let prompt_tpl = prompt_tpl.as_deref();
1899 s.spawn(move || {
1900 let w_conn = match open_rw(&paths.db) {
1901 Ok(c) => c,
1902 Err(e) => {
1903 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1904 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1905 }
1906 };
1907 let w_queue = match open_queue_db(queue_path) {
1908 Ok(c) => c,
1909 Err(e) => {
1910 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1911 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1912 }
1913 };
1914 let mut w_completed = 0usize;
1915 let mut w_failed = 0usize;
1916 let mut w_skipped = 0usize;
1917 let mut w_cost = 0.0f64;
1918 let mut w_oauth = false;
1919 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1920 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1921 let mut w_breaker = crate::retry::CircuitBreaker::new(
1927 args.circuit_breaker_threshold.max(1),
1928 std::time::Duration::from_secs(60),
1929 );
1930
1931 loop {
1932 if crate::shutdown_requested() {
1933 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1934 break;
1935 }
1936 if let Some(b) = budget {
1937 if !w_oauth && w_cost >= b {
1938 break;
1939 }
1940 }
1941 let dequeue_sql = format!(
1945 "UPDATE queue SET status='processing', attempt=attempt+1 \
1946 WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
1947 ORDER BY id LIMIT 1) \
1948 RETURNING id, item_key, item_type, attempt"
1949 );
1950 let pending: Option<(i64, String, String, i64)> = w_queue
1951 .query_row(
1952 &dequeue_sql,
1953 [],
1954 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1955 )
1956 .ok();
1957 let (queue_id, item_key, _item_type, attempt_current) = match pending {
1958 Some(p) => p,
1959 None => break,
1960 };
1961 let item_started = Instant::now();
1962 let current_index = w_completed + w_failed + w_skipped;
1963
1964 let provider_bin = provider_binary.unwrap_or_else(|| std::path::Path::new(""));
1970 let call_result = match operation {
1971 EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1972 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1973 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
1974 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
1975 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1976 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1977 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1978 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1979 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1980 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1981 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1982 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode),
1983 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_bin, provider_model, provider_timeout, mode, args.body_extract_graph_only),
1984 };
1985
1986 match call_result {
1987 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1988 if is_oauth { w_oauth = true; }
1989 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1990 let _ = w_queue.execute(
1991 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1992 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1993 );
1994 w_completed += 1;
1995 if !is_oauth { w_cost += cost; }
1996 let _ = w_breaker
1998 .record(crate::retry::AttemptOutcome::Success);
1999 let _guard = stdout_mu.lock();
2000 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2001 }
2002 Ok(EnrichItemResult::Skipped { reason }) => {
2003 w_skipped += 1;
2004 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2005 let _guard = stdout_mu.lock();
2006 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2007 }
2008 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2009 w_skipped += 1;
2015 let reason = format!(
2016 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2017 );
2018 let _ = w_queue.execute(
2019 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2020 rusqlite::params![reason, queue_id],
2021 );
2022 let _guard = stdout_mu.lock();
2023 emit_json(&ItemEvent {
2024 item: &item_key,
2025 status: "preservation_failed",
2026 memory_id: None,
2027 entity_id: None,
2028 entities: None,
2029 rels: None,
2030 chars_before: Some(chars_before),
2031 chars_after: Some(chars_after),
2032 cost_usd: None,
2033 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2034 error: Some(reason),
2035 index: current_index,
2036 total,
2037 });
2038 }
2039 Err(e) => {
2040 let err_str = format!("{e}");
2041 if matches!(e, AppError::RateLimited { .. }) {
2042 if crate::retry::is_kill_switch_active() {
2043 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2044 } else if std::time::Instant::now() >= w_deadline {
2045 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2046 } else {
2047 let half = w_backoff / 2;
2048 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2049 let actual_wait = half + jitter;
2050 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2051 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2052 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2053 w_backoff = (w_backoff * 2).min(900);
2054 continue;
2055 }
2056 }
2057 w_failed += 1;
2058 let outcome = record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e);
2059 let _guard = stdout_mu.lock();
2060 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2061 let breaker_opened = w_breaker.record(outcome);
2064 if breaker_opened {
2065 tracing::error!(target: "enrich",
2066 consecutive_failures = w_breaker.consecutive_failures(),
2067 "circuit breaker opened — aborting worker"
2068 );
2069 break;
2070 }
2071 }
2072 }
2073 }
2074 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2075 })
2076 })
2077 .collect();
2078 handles
2079 .into_iter()
2080 .map(|h| {
2081 h.join().unwrap_or(WorkerResult {
2082 completed: 0,
2083 failed: 0,
2084 skipped: 0,
2085 cost: 0.0,
2086 oauth: false,
2087 })
2088 })
2089 .collect()
2090 });
2091
2092 for r in &results {
2093 completed += r.completed;
2094 failed += r.failed;
2095 skipped += r.skipped;
2096 cost_total += r.cost;
2097 if r.oauth && !oauth_detected {
2098 oauth_detected = true;
2099 }
2100 }
2101 } else {
2102 loop {
2104 if crate::shutdown_requested() {
2105 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2106 break;
2107 }
2108
2109 if let Some(budget) = args.max_cost_usd {
2111 if !oauth_detected && cost_total >= budget {
2112 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2113 break;
2114 }
2115 }
2116
2117 let dequeue_sql = format!(
2120 "UPDATE queue SET status='processing', attempt=attempt+1 \
2121 WHERE id = (SELECT id FROM queue WHERE status='pending' {backoff_clause} \
2122 ORDER BY id LIMIT 1) \
2123 RETURNING id, item_key, item_type, attempt"
2124 );
2125 let pending: Option<(i64, String, String, i64)> = queue_conn
2126 .query_row(&dequeue_sql, [], |row| {
2127 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
2128 })
2129 .ok();
2130
2131 let (queue_id, item_key, item_type, attempt_current) = match pending {
2132 Some(p) => p,
2133 None => break,
2134 };
2135
2136 let item_started = Instant::now();
2137 let current_index = completed + failed + skipped;
2138
2139 let provider_bin = provider_binary
2142 .as_deref()
2143 .unwrap_or_else(|| std::path::Path::new(""));
2144 let call_result = match args.operation() {
2145 EnrichOperation::MemoryBindings | EnrichOperation::AugmentBindings => {
2146 call_memory_bindings(
2147 &conn,
2148 &namespace,
2149 &item_key,
2150 provider_bin,
2151 provider_model,
2152 provider_timeout,
2153 &args.mode(),
2154 )
2155 }
2156 EnrichOperation::EntityDescriptions => call_entity_description(
2157 &conn,
2158 &namespace,
2159 &item_key,
2160 provider_bin,
2161 provider_model,
2162 provider_timeout,
2163 &args.mode(),
2164 ),
2165 EnrichOperation::BodyEnrich => call_body_enrich(
2166 &conn,
2167 &namespace,
2168 &item_key,
2169 provider_bin,
2170 provider_model,
2171 provider_timeout,
2172 &args.mode(),
2173 args.min_output_chars,
2174 args.max_output_chars,
2175 args.prompt_template.as_deref(),
2176 args.preserve_threshold,
2177 &paths,
2178 llm_backend,
2179 embedding_backend,
2180 ),
2181 EnrichOperation::ReEmbed => call_reembed(
2182 &conn,
2183 &namespace,
2184 &item_key,
2185 &paths,
2186 llm_backend,
2187 embedding_backend,
2188 ),
2189 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2190 &conn,
2191 &namespace,
2192 &item_key,
2193 provider_bin,
2194 provider_model,
2195 provider_timeout,
2196 &args.mode(),
2197 ),
2198 EnrichOperation::RelationReclassify => call_relation_reclassify(
2199 &conn,
2200 &namespace,
2201 &item_key,
2202 provider_bin,
2203 provider_model,
2204 provider_timeout,
2205 &args.mode(),
2206 ),
2207 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2208 call_entity_connect(
2209 &conn,
2210 &namespace,
2211 &item_key,
2212 provider_bin,
2213 provider_model,
2214 provider_timeout,
2215 &args.mode(),
2216 )
2217 }
2218 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2219 &conn,
2220 &namespace,
2221 &item_key,
2222 provider_bin,
2223 provider_model,
2224 provider_timeout,
2225 &args.mode(),
2226 ),
2227 EnrichOperation::DescriptionEnrich => call_description_enrich(
2228 &conn,
2229 &namespace,
2230 &item_key,
2231 provider_bin,
2232 provider_model,
2233 provider_timeout,
2234 &args.mode(),
2235 ),
2236 EnrichOperation::DomainClassify => call_domain_classify(
2237 &conn,
2238 &namespace,
2239 &item_key,
2240 provider_bin,
2241 provider_model,
2242 provider_timeout,
2243 &args.mode(),
2244 ),
2245 EnrichOperation::GraphAudit => call_graph_audit(
2246 &conn,
2247 &namespace,
2248 &item_key,
2249 provider_bin,
2250 provider_model,
2251 provider_timeout,
2252 &args.mode(),
2253 ),
2254 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2255 &conn,
2256 &namespace,
2257 &item_key,
2258 provider_bin,
2259 provider_model,
2260 provider_timeout,
2261 &args.mode(),
2262 ),
2263 EnrichOperation::BodyExtract => call_body_extract(
2264 &conn,
2265 &namespace,
2266 &item_key,
2267 provider_bin,
2268 provider_model,
2269 provider_timeout,
2270 &args.mode(),
2271 args.body_extract_graph_only,
2272 ),
2273 };
2274
2275 match call_result {
2276 Ok(EnrichItemResult::Done {
2277 memory_id,
2278 entity_id,
2279 entities,
2280 rels,
2281 chars_before,
2282 chars_after,
2283 cost,
2284 is_oauth,
2285 }) => {
2286 if is_oauth && !oauth_detected {
2287 oauth_detected = true;
2288 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2289 }
2290 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2291
2292 let persist_err: Option<String> = match args.operation() {
2294 EnrichOperation::MemoryBindings => {
2295 None
2297 }
2298 EnrichOperation::EntityDescriptions => {
2299 None
2301 }
2302 EnrichOperation::BodyEnrich => {
2303 None
2305 }
2306 _ => {
2307 None
2309 }
2310 };
2311
2312 if let Err(e) = queue_conn.execute(
2313 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2314 rusqlite::params![
2315 memory_id,
2316 entity_id,
2317 entities as i64,
2318 rels as i64,
2319 cost,
2320 item_started.elapsed().as_millis() as i64,
2321 queue_id
2322 ],
2323 ) {
2324 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2325 }
2326
2327 if persist_err.is_none() {
2328 completed += 1;
2329 if !is_oauth {
2330 cost_total += cost;
2331 }
2332 emit_json(&ItemEvent {
2333 item: &item_key,
2334 status: "done",
2335 memory_id,
2336 entity_id,
2337 entities: Some(entities),
2338 rels: Some(rels),
2339 chars_before,
2340 chars_after,
2341 cost_usd: if is_oauth { None } else { Some(cost) },
2342 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2343 error: None,
2344 index: current_index,
2345 total,
2346 });
2347 } else {
2348 failed += 1;
2349 emit_json(&ItemEvent {
2350 item: &item_key,
2351 status: "failed",
2352 memory_id: None,
2353 entity_id: None,
2354 entities: None,
2355 rels: None,
2356 chars_before: None,
2357 chars_after: None,
2358 cost_usd: None,
2359 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2360 error: persist_err,
2361 index: current_index,
2362 total,
2363 });
2364 }
2365 }
2366 Ok(EnrichItemResult::Skipped { reason }) => {
2367 skipped += 1;
2368 if let Err(e) = queue_conn.execute(
2369 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2370 rusqlite::params![reason, queue_id],
2371 ) {
2372 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2373 }
2374 emit_json(&ItemEvent {
2375 item: &item_key,
2376 status: "skipped",
2377 memory_id: None,
2378 entity_id: None,
2379 entities: None,
2380 rels: None,
2381 chars_before: None,
2382 chars_after: None,
2383 cost_usd: None,
2384 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2385 error: None,
2386 index: current_index,
2387 total,
2388 });
2389 }
2390 Ok(EnrichItemResult::PreservationFailed {
2391 score,
2392 threshold,
2393 chars_before,
2394 chars_after,
2395 }) => {
2396 skipped += 1;
2403 let reason = format!(
2404 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2405 );
2406 if let Err(qe) = queue_conn.execute(
2407 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2408 rusqlite::params![reason, queue_id],
2409 ) {
2410 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2411 }
2412 emit_json(&ItemEvent {
2413 item: &item_key,
2414 status: "preservation_failed",
2415 memory_id: None,
2416 entity_id: None,
2417 entities: None,
2418 rels: None,
2419 chars_before: Some(chars_before),
2420 chars_after: Some(chars_after),
2421 cost_usd: None,
2422 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2423 error: Some(reason),
2424 index: current_index,
2425 total,
2426 });
2427 }
2428 Err(e) => {
2429 let err_str = format!("{e}");
2430 if matches!(e, AppError::RateLimited { .. }) {
2431 if crate::retry::is_kill_switch_active() {
2432 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2433 } else if std::time::Instant::now() >= rate_limit_deadline {
2434 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2435 } else {
2436 let half = backoff_secs / 2;
2437 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2438 let actual_wait = half + jitter;
2439 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2440 if let Err(qe) = queue_conn.execute(
2441 "UPDATE queue SET status='pending' WHERE id=?1",
2442 rusqlite::params![queue_id],
2443 ) {
2444 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2445 }
2446 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2447 backoff_secs = (backoff_secs * 2).min(900);
2448 continue;
2449 }
2450 }
2451
2452 failed += 1;
2453 let _outcome = record_item_failure(
2454 &queue_conn,
2455 queue_id,
2456 attempt_current,
2457 args.max_attempts,
2458 &e,
2459 );
2460 emit_json(&ItemEvent {
2461 item: &item_key,
2462 status: "failed",
2463 memory_id: None,
2464 entity_id: None,
2465 entities: None,
2466 rels: None,
2467 chars_before: None,
2468 chars_after: None,
2469 cost_usd: None,
2470 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2471 error: Some(err_str),
2472 index: current_index,
2473 total,
2474 });
2475 }
2476 }
2477
2478 let _ = item_type; }
2480 } if !args.until_empty {
2483 break;
2484 }
2485 let eligible_remaining: i64 = queue_conn
2486 .query_row(
2487 &format!("SELECT COUNT(*) FROM queue WHERE status='pending' {backoff_clause}"),
2488 [],
2489 |r| r.get(0),
2490 )
2491 .unwrap_or(0);
2492 let progressed = completed > completed_before;
2493 if std::time::Instant::now() >= until_deadline {
2494 tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
2495 break;
2496 }
2497 if !progressed && eligible_remaining == 0 {
2498 tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
2499 break;
2500 }
2501 if eligible_remaining == 0 {
2502 std::thread::sleep(std::time::Duration::from_secs(1));
2504 }
2505 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2508 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2509
2510 let waiting_final: i64 = queue_conn
2514 .query_row(
2515 "SELECT COUNT(*) FROM queue WHERE status='pending' \
2516 AND (operation = ?1 OR operation IS NULL) \
2517 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
2518 rusqlite::params![op_label],
2519 |r| r.get(0),
2520 )
2521 .unwrap_or(0);
2522 let dead_final: i64 = queue_conn
2523 .query_row(
2524 "SELECT COUNT(*) FROM queue WHERE status='dead' \
2525 AND (operation = ?1 OR operation IS NULL)",
2526 rusqlite::params![op_label],
2527 |r| r.get(0),
2528 )
2529 .unwrap_or(0);
2530
2531 emit_json(&EnrichSummary {
2532 summary: true,
2533 operation: format!("{:?}", args.operation()),
2534 items_total: total,
2535 completed,
2536 failed,
2537 skipped,
2538 cost_usd: cost_total,
2539 elapsed_ms: started.elapsed().as_millis() as u64,
2540 backend_invoked: take_enrich_backend(),
2541 waiting: waiting_final,
2542 dead: dead_final,
2543 });
2544
2545 if failed == 0 {
2546 let dead: i64 = queue_conn
2549 .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
2550 r.get(0)
2551 })
2552 .unwrap_or(0);
2553 let skipped_remaining: i64 = queue_conn
2559 .query_row(
2560 "SELECT COUNT(*) FROM queue WHERE status='skipped'",
2561 [],
2562 |r| r.get(0),
2563 )
2564 .unwrap_or(0);
2565 if dead == 0 && skipped_remaining == 0 {
2566 let _ = std::fs::remove_file(&queue_path);
2567 }
2568 }
2569
2570 Ok(())
2571}
2572
2573#[cfg(test)]
2580mod tests {
2581 use super::*;
2582
2583 #[test]
2584 fn bindings_schema_is_valid_json() {
2585 let _: serde_json::Value =
2586 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2587 }
2588
2589 #[test]
2590 fn entity_description_schema_is_valid_json() {
2591 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2592 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2593 }
2594
2595 #[test]
2596 fn body_enrich_schema_is_valid_json() {
2597 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2598 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2599 }
2600}