1use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51const BINDINGS_SCHEMA: &str = r#"{
56 "type": "object",
57 "properties": {
58 "entities": {
59 "type": "array",
60 "items": {
61 "type": "object",
62 "properties": {
63 "name": { "type": "string" },
64 "entity_type": {
65 "type": "string",
66 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67 }
68 },
69 "required": ["name", "entity_type"],
70 "additionalProperties": false
71 }
72 },
73 "relationships": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "source": { "type": "string" },
79 "target": { "type": "string" },
80 "relation": {
81 "type": "string",
82 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83 },
84 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85 },
86 "required": ["source","target","relation","strength"],
87 "additionalProperties": false
88 }
89 }
90 },
91 "required": ["entities","relationships"],
92 "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96 "type": "object",
97 "properties": {
98 "description": { "type": "string" }
99 },
100 "required": ["description"],
101 "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105 "type": "object",
106 "properties": {
107 "enriched_body": { "type": "string" }
108 },
109 "required": ["enriched_body"],
110 "additionalProperties": false
111}"#;
112
113const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126 "reasoning": { "type": "string" }
127 },
128 "required": ["calibrated_weight", "reasoning"],
129 "additionalProperties": false
130}"#;
131
132const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149 "type": "object",
150 "properties": {
151 "relation": { "type": "string" },
152 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153 "reasoning": { "type": "string" }
154 },
155 "required": ["relation", "strength", "reasoning"],
156 "additionalProperties": false
157}"#;
158
159const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166 "type": "object",
167 "properties": {
168 "relation": { "type": "string" },
169 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170 "reasoning": { "type": "string" }
171 },
172 "required": ["relation", "strength", "reasoning"],
173 "additionalProperties": false
174}"#;
175
176const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183 "type": "object",
184 "properties": {
185 "validated_type": { "type": "string" },
186 "was_correct": { "type": "boolean" },
187 "reasoning": { "type": "string" }
188 },
189 "required": ["validated_type", "was_correct", "reasoning"],
190 "additionalProperties": false
191}"#;
192
193const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200 "type": "object",
201 "properties": {
202 "description": { "type": "string" },
203 "reasoning": { "type": "string" }
204 },
205 "required": ["description", "reasoning"],
206 "additionalProperties": false
207}"#;
208
209const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214 "type": "object",
215 "properties": {
216 "domain": { "type": "string" },
217 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218 "reasoning": { "type": "string" }
219 },
220 "required": ["domain", "confidence", "reasoning"],
221 "additionalProperties": false
222}"#;
223
224const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230 "type": "object",
231 "properties": {
232 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234 "reasoning": { "type": "string" }
235 },
236 "required": ["quality_score", "issues", "reasoning"],
237 "additionalProperties": false
238}"#;
239
240const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247 "type": "object",
248 "properties": {
249 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251 "summary": { "type": "string" }
252 },
253 "required": ["entities", "relationships", "summary"],
254 "additionalProperties": false
255}"#;
256
257const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263 "type": "object",
264 "properties": {
265 "restructured_body": { "type": "string" },
266 "changes_summary": { "type": "string" }
267 },
268 "required": ["restructured_body", "changes_summary"],
269 "additionalProperties": false
270}"#;
271
272const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299 MemoryBindings,
301 EntityDescriptions,
303 BodyEnrich,
305 ReEmbed,
307 WeightCalibrate,
309 RelationReclassify,
311 EntityConnect,
313 EntityTypeValidate,
315 DescriptionEnrich,
317 CrossDomainBridges,
319 DomainClassify,
321 GraphAudit,
323 DeepResearchSynth,
325 BodyExtract,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332 ClaudeCode,
334 Codex,
336}
337
338impl std::fmt::Display for EnrichMode {
339 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340 match self {
341 EnrichMode::ClaudeCode => write!(f, "claude-code"),
342 EnrichMode::Codex => write!(f, "codex"),
343 }
344 }
345}
346
347#[derive(clap::Args)]
349#[command(
350 about = "Enrich graph memories and entities using an LLM provider",
351 after_long_help = "EXAMPLES:\n \
352 # Add missing entity bindings to all unbound memories\n \
353 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
354 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
355 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
356 # Expand short memory bodies (GAP-18)\n \
357 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
358 # Rebuild only missing memory embeddings without rewriting bodies\n \
359 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
360 # Resume an interrupted body-enrich run\n \
361 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
362 # Retry only failed items from a previous run\n \
363 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
364 EXIT CODES:\n \
365 0 success\n \
366 1 validation error (bad args, binary not found)\n \
367 14 I/O error"
368)]
369pub struct EnrichArgs {
370 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
372 pub operation: EnrichOperation,
373
374 #[arg(long, value_enum, default_value = "claude-code")]
376 pub mode: EnrichMode,
377
378 #[arg(long, value_name = "N")]
380 pub limit: Option<usize>,
381
382 #[arg(long)]
384 pub dry_run: bool,
385
386 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
388 pub namespace: Option<String>,
389
390 #[arg(long, value_name = "PATH")]
393 pub claude_binary: Option<PathBuf>,
394
395 #[arg(long, value_name = "MODEL")]
397 pub claude_model: Option<String>,
398
399 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
401 pub claude_timeout: u64,
402
403 #[arg(long, value_name = "PATH")]
406 pub codex_binary: Option<PathBuf>,
407
408 #[arg(long, value_name = "MODEL")]
410 pub codex_model: Option<String>,
411
412 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
414 pub codex_timeout: u64,
415
416 #[arg(long, value_name = "USD")]
419 pub max_cost_usd: Option<f64>,
420
421 #[arg(long)]
424 pub resume: bool,
425
426 #[arg(long)]
428 pub retry_failed: bool,
429
430 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
433 pub min_output_chars: usize,
434
435 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
437 pub max_output_chars: usize,
438
439 #[arg(long, default_value_t = true)]
441 pub preserve_check: bool,
442
443 #[arg(long, value_name = "PATH")]
445 pub prompt_template: Option<PathBuf>,
446
447 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
451 pub llm_parallelism: u32,
452
453 #[arg(long)]
456 pub json: bool,
457
458 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
460 pub db: Option<String>,
461
462 #[arg(long, value_name = "SECONDS")]
465 pub wait_job_singleton: Option<u64>,
466
467 #[arg(long, default_value_t = false)]
471 pub force_job_singleton: bool,
472
473 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
477 pub names: Vec<String>,
478
479 #[arg(long, value_name = "PATH")]
483 pub names_file: Option<PathBuf>,
484
485 #[arg(long, default_value_t = false)]
489 pub preflight_check: bool,
490
491 #[arg(long, value_enum)]
495 pub fallback_mode: Option<EnrichMode>,
496
497 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
500 pub rate_limit_buffer: u64,
501
502 #[arg(long, default_value_t = true)]
506 pub max_load_check: bool,
507
508 #[arg(long, value_name = "N", default_value_t = 5)]
511 pub circuit_breaker_threshold: u32,
512
513 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
520 pub preserve_threshold: f64,
521
522 #[arg(long, default_value_t = true)]
527 pub codex_model_validate: bool,
528
529 #[arg(long, value_name = "MODEL")]
534 pub codex_model_fallback: Option<String>,
535}
536
537#[derive(Debug, Serialize)]
546struct PhaseEvent<'a> {
547 phase: &'a str,
548 #[serde(skip_serializing_if = "Option::is_none")]
549 binary_path: Option<&'a str>,
550 #[serde(skip_serializing_if = "Option::is_none")]
551 version: Option<&'a str>,
552 #[serde(skip_serializing_if = "Option::is_none")]
553 items_total: Option<usize>,
554 #[serde(skip_serializing_if = "Option::is_none")]
555 items_pending: Option<usize>,
556 #[serde(skip_serializing_if = "Option::is_none")]
558 llm_parallelism: Option<u32>,
559}
560
561#[derive(Debug, Serialize)]
562struct ItemEvent<'a> {
563 item: &'a str,
565 status: &'a str,
566 #[serde(skip_serializing_if = "Option::is_none")]
567 memory_id: Option<i64>,
568 #[serde(skip_serializing_if = "Option::is_none")]
569 entity_id: Option<i64>,
570 #[serde(skip_serializing_if = "Option::is_none")]
571 entities: Option<usize>,
572 #[serde(skip_serializing_if = "Option::is_none")]
573 rels: Option<usize>,
574 #[serde(skip_serializing_if = "Option::is_none")]
575 chars_before: Option<usize>,
576 #[serde(skip_serializing_if = "Option::is_none")]
577 chars_after: Option<usize>,
578 #[serde(skip_serializing_if = "Option::is_none")]
579 cost_usd: Option<f64>,
580 #[serde(skip_serializing_if = "Option::is_none")]
581 elapsed_ms: Option<u64>,
582 #[serde(skip_serializing_if = "Option::is_none")]
583 error: Option<String>,
584 index: usize,
585 total: usize,
586}
587
588#[derive(Debug, Serialize)]
589struct EnrichSummary {
590 summary: bool,
591 operation: String,
592 items_total: usize,
593 completed: usize,
594 failed: usize,
595 skipped: usize,
596 cost_usd: f64,
597 elapsed_ms: u64,
598 #[serde(skip_serializing_if = "Option::is_none")]
603 backend_invoked: Option<&'static str>,
604}
605
606use crate::output::emit_json_line as emit_json;
607
608fn open_queue_db(path: &str) -> Result<Connection, AppError> {
623 let conn = Connection::open(path)?;
624 conn.pragma_update(None, "journal_mode", "wal")?;
625 conn.execute_batch(
626 "CREATE TABLE IF NOT EXISTS queue (
627 id INTEGER PRIMARY KEY AUTOINCREMENT,
628 item_key TEXT NOT NULL UNIQUE,
629 item_type TEXT NOT NULL DEFAULT 'memory',
630 status TEXT NOT NULL DEFAULT 'pending',
631 memory_id INTEGER,
632 entity_id INTEGER,
633 entities INTEGER DEFAULT 0,
634 rels INTEGER DEFAULT 0,
635 error TEXT,
636 cost_usd REAL DEFAULT 0.0,
637 attempt INTEGER DEFAULT 0,
638 elapsed_ms INTEGER,
639 created_at TEXT DEFAULT (datetime('now')),
640 done_at TEXT
641 );
642 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
643 )?;
644 Ok(conn)
645}
646
647fn call_claude(
655 binary: &Path,
656 prompt: &str,
657 json_schema: &str,
658 input_text: &str,
659 model: Option<&str>,
660 timeout_secs: u64,
661) -> Result<(serde_json::Value, f64, bool), AppError> {
662 let result = crate::commands::claude_runner::run_claude(
663 binary,
664 prompt,
665 json_schema,
666 input_text,
667 model,
668 timeout_secs,
669 7,
670 )?;
671 Ok((result.value, result.cost_usd, result.is_oauth))
672}
673
674enum PreflightOutcome {
680 Healthy,
682 RateLimited {
686 reason: String,
687 suggestion: &'static str,
688 },
689 Error(AppError),
691}
692
693fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
701 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
702
703 match args.mode {
704 EnrichMode::ClaudeCode => {
705 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
706 Ok(b) => b,
707 Err(e) => return PreflightOutcome::Error(e),
708 };
709 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
714 Ok(p) => p,
715 Err(e) => {
716 return PreflightOutcome::Error(AppError::Io(e));
717 }
718 };
719 let mut cmd = std::process::Command::new(&bin);
720 cmd.env_clear();
721 for var in &["PATH", "HOME", "USER"] {
722 if let Ok(val) = std::env::var(var) {
723 cmd.env(var, val);
724 }
725 }
726 cmd.arg("-p")
727 .arg("ping")
728 .arg("--max-turns")
729 .arg("1")
730 .arg("--strict-mcp-config")
731 .arg("--mcp-config")
732 .arg(mcp_config_path.as_os_str())
733 .arg("--dangerously-skip-permissions")
734 .arg("--settings")
735 .arg("{\"hooks\":{}}")
736 .arg("--output-format")
737 .arg("json")
738 .stdin(std::process::Stdio::null())
739 .stdout(std::process::Stdio::piped())
740 .stderr(std::process::Stdio::piped());
741
742 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
743 Ok(c) => c,
744 Err(e) => {
745 return PreflightOutcome::Error(AppError::Io(e));
746 }
747 };
748 let output = match wait_with_timeout(child, timeout) {
749 Ok(out) => out,
750 Err(e) => return PreflightOutcome::Error(e),
751 };
752 if !output.status.success() {
753 let stderr = String::from_utf8_lossy(&output.stderr);
754 if stderr.contains("hit your session limit")
755 || stderr.contains("rate_limit")
756 || stderr.contains("429")
757 {
758 return PreflightOutcome::RateLimited {
759 reason: stderr.trim().to_string(),
760 suggestion:
761 "wait for the OAuth window to reset or use --fallback-mode codex",
762 };
763 }
764 return PreflightOutcome::Error(AppError::Validation(format!(
765 "preflight probe failed: {stderr}",
766 stderr = stderr.trim()
767 )));
768 }
769 PreflightOutcome::Healthy
770 }
771 EnrichMode::Codex => {
772 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
773 Ok(b) => b,
774 Err(e) => return PreflightOutcome::Error(e),
775 };
776 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
777 .map_err(PreflightOutcome::Error)
778 .ok();
779 let schema = "{}";
780 let schema_path = match super::codex_spawn::trusted_schema_path() {
781 Ok(p) => p,
782 Err(e) => return PreflightOutcome::Error(e),
783 };
784 let spawn_args = super::codex_spawn::CodexSpawnArgs {
785 binary: &bin,
786 prompt: "ping",
787 json_schema: schema,
788 input_text: "",
789 model: args.codex_model.as_deref(),
790 timeout_secs: args.rate_limit_buffer.max(60),
791 schema_path: schema_path.clone(),
792 };
793 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
794 Ok(c) => c,
795 Err(e) => return PreflightOutcome::Error(e),
796 };
797 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
798 Ok(c) => c,
799 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
800 };
801 let output = match wait_with_timeout(child, timeout) {
802 Ok(out) => out,
803 Err(e) => return PreflightOutcome::Error(e),
804 };
805 let _ = std::fs::remove_file(&schema_path);
806 if !output.status.success() {
807 let stderr = String::from_utf8_lossy(&output.stderr);
808 if stderr.contains("rate_limit")
809 || stderr.contains("429")
810 || stderr.contains("Too Many Requests")
811 {
812 return PreflightOutcome::RateLimited {
813 reason: stderr.trim().to_string(),
814 suggestion: "wait for the rate-limit window to reset",
815 };
816 }
817 return PreflightOutcome::Error(AppError::Validation(format!(
818 "preflight probe failed: {stderr}",
819 stderr = stderr.trim()
820 )));
821 }
822 PreflightOutcome::Healthy
823 }
824 }
825}
826
827fn wait_with_timeout(
829 mut child: std::process::Child,
830 timeout: std::time::Duration,
831) -> Result<std::process::Output, AppError> {
832 use wait_timeout::ChildExt;
833 let start = std::time::Instant::now();
834 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
835 if status.is_none() {
836 let _ = child.kill();
837 let _ = child.wait();
838 return Err(AppError::Validation(format!(
839 "preflight probe timed out after {}s",
840 start.elapsed().as_secs()
841 )));
842 }
843 let mut stdout = Vec::new();
844 if let Some(mut out) = child.stdout.take() {
845 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
846 }
847 let mut stderr = Vec::new();
848 if let Some(mut err) = child.stderr.take() {
849 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
850 }
851 let exit = status.unwrap();
852 Ok(std::process::Output {
853 status: exit,
854 stdout,
855 stderr,
856 })
857}
858
859fn scan_unbound_memories(
870 conn: &Connection,
871 namespace: &str,
872 limit: Option<usize>,
873 name_filter: &[String],
874) -> Result<Vec<(i64, String, String)>, AppError> {
875 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
876
877 if name_filter.is_empty() {
878 let sql = format!(
879 "SELECT m.id, m.name, m.body
880 FROM memories m
881 WHERE m.namespace = ?1
882 AND m.deleted_at IS NULL
883 AND NOT EXISTS (
884 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
885 )
886 ORDER BY m.id
887 {limit_clause}"
888 );
889 let mut stmt = conn.prepare(&sql)?;
890 let rows = stmt
891 .query_map(rusqlite::params![namespace], |r| {
892 Ok((
893 r.get::<_, i64>(0)?,
894 r.get::<_, String>(1)?,
895 r.get::<_, String>(2)?,
896 ))
897 })?
898 .collect::<Result<Vec<_>, _>>()?;
899 Ok(rows)
900 } else {
901 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
903 .map(|i| format!("?{i}"))
904 .collect();
905 let in_clause = placeholders.join(", ");
906 let sql = format!(
907 "SELECT m.id, m.name, m.body
908 FROM memories m
909 WHERE m.namespace = ?1
910 AND m.deleted_at IS NULL
911 AND m.name IN ({in_clause})
912 AND NOT EXISTS (
913 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
914 )
915 ORDER BY m.id
916 {limit_clause}"
917 );
918 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
919 params_vec.push(&namespace);
920 for n in name_filter {
921 params_vec.push(n);
922 }
923 let mut stmt = conn.prepare(&sql)?;
924 let rows = stmt
925 .query_map(
926 rusqlite::params_from_iter(params_vec.iter().copied()),
927 |r| {
928 Ok((
929 r.get::<_, i64>(0)?,
930 r.get::<_, String>(1)?,
931 r.get::<_, String>(2)?,
932 ))
933 },
934 )?
935 .collect::<Result<Vec<_>, _>>()?;
936 Ok(rows)
937 }
938}
939
940fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
945 let content = std::fs::read_to_string(path).map_err(|e| {
946 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
947 })?;
948 let mut seen = std::collections::HashSet::new();
949 let mut out = Vec::new();
950 for line in content.lines() {
951 let trimmed = line.trim();
952 if trimmed.is_empty() || trimmed.starts_with('#') {
953 continue;
954 }
955 if seen.insert(trimmed.to_string()) {
956 out.push(trimmed.to_string());
957 }
958 }
959 Ok(out)
960}
961
962fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
964 let mut combined: Vec<String> = args.names.clone();
965 if let Some(p) = &args.names_file {
966 let from_file = read_names_file(p)?;
967 for n in from_file {
968 if !combined.contains(&n) {
969 combined.push(n);
970 }
971 }
972 }
973 Ok(combined)
974}
975
976fn scan_entities_without_description(
980 conn: &Connection,
981 namespace: &str,
982 limit: Option<usize>,
983) -> Result<Vec<(i64, String, String)>, AppError> {
984 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
985 let sql = format!(
986 "SELECT id, name, type
987 FROM entities
988 WHERE namespace = ?1
989 AND (description IS NULL OR description = '')
990 ORDER BY id
991 {limit_clause}"
992 );
993 let mut stmt = conn.prepare(&sql)?;
994 let rows = stmt
995 .query_map(rusqlite::params![namespace], |r| {
996 Ok((
997 r.get::<_, i64>(0)?,
998 r.get::<_, String>(1)?,
999 r.get::<_, String>(2)?,
1000 ))
1001 })?
1002 .collect::<Result<Vec<_>, _>>()?;
1003 Ok(rows)
1004}
1005
1006fn scan_short_body_memories(
1010 conn: &Connection,
1011 namespace: &str,
1012 min_chars: usize,
1013 limit: Option<usize>,
1014) -> Result<Vec<(i64, String, String)>, AppError> {
1015 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1016 let sql = format!(
1017 "SELECT m.id, m.name, m.body
1018 FROM memories m
1019 WHERE m.namespace = ?1
1020 AND m.deleted_at IS NULL
1021 AND LENGTH(COALESCE(m.body,'')) < ?2
1022 ORDER BY m.id
1023 {limit_clause}"
1024 );
1025 let mut stmt = conn.prepare(&sql)?;
1026 let rows = stmt
1027 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1028 Ok((
1029 r.get::<_, i64>(0)?,
1030 r.get::<_, String>(1)?,
1031 r.get::<_, String>(2)?,
1032 ))
1033 })?
1034 .collect::<Result<Vec<_>, _>>()?;
1035 Ok(rows)
1036}
1037
1038fn scan_memories_without_embeddings(
1042 conn: &Connection,
1043 namespace: &str,
1044 limit: Option<usize>,
1045 name_filter: &[String],
1046) -> Result<Vec<(i64, String, String)>, AppError> {
1047 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1048
1049 if name_filter.is_empty() {
1050 let sql = format!(
1051 "SELECT m.id, m.name, COALESCE(m.body,'')
1052 FROM memories m
1053 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1054 WHERE m.namespace = ?1
1055 AND m.deleted_at IS NULL
1056 AND me.memory_id IS NULL
1057 ORDER BY m.id
1058 {limit_clause}"
1059 );
1060 let mut stmt = conn.prepare(&sql)?;
1061 let rows = stmt
1062 .query_map(rusqlite::params![namespace], |r| {
1063 Ok((
1064 r.get::<_, i64>(0)?,
1065 r.get::<_, String>(1)?,
1066 r.get::<_, String>(2)?,
1067 ))
1068 })?
1069 .collect::<Result<Vec<_>, _>>()?;
1070 Ok(rows)
1071 } else {
1072 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1073 .map(|i| format!("?{i}"))
1074 .collect();
1075 let in_clause = placeholders.join(", ");
1076 let sql = format!(
1077 "SELECT m.id, m.name, COALESCE(m.body,'')
1078 FROM memories m
1079 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1080 WHERE m.namespace = ?1
1081 AND m.deleted_at IS NULL
1082 AND m.name IN ({in_clause})
1083 AND me.memory_id IS NULL
1084 ORDER BY m.id
1085 {limit_clause}"
1086 );
1087 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1088 params_vec.push(&namespace);
1089 for n in name_filter {
1090 params_vec.push(n);
1091 }
1092 let mut stmt = conn.prepare(&sql)?;
1093 let rows = stmt
1094 .query_map(
1095 rusqlite::params_from_iter(params_vec.iter().copied()),
1096 |r| {
1097 Ok((
1098 r.get::<_, i64>(0)?,
1099 r.get::<_, String>(1)?,
1100 r.get::<_, String>(2)?,
1101 ))
1102 },
1103 )?
1104 .collect::<Result<Vec<_>, _>>()?;
1105 Ok(rows)
1106 }
1107}
1108
1109#[allow(clippy::type_complexity)]
1111fn scan_weight_candidates(
1112 conn: &Connection,
1113 namespace: &str,
1114 limit: Option<usize>,
1115) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1116 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1117 let sql = format!(
1118 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1119 FROM relationships r \
1120 JOIN entities e1 ON e1.id = r.source_id \
1121 JOIN entities e2 ON e2.id = r.target_id \
1122 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1123 ORDER BY r.weight DESC {limit_clause}"
1124 );
1125 let mut stmt = conn.prepare(&sql)?;
1126 let rows = stmt
1127 .query_map(rusqlite::params![namespace], |r| {
1128 Ok((
1129 r.get::<_, i64>(0)?,
1130 r.get::<_, String>(1)?,
1131 r.get::<_, String>(2)?,
1132 r.get::<_, String>(3)?,
1133 r.get::<_, f64>(4)?,
1134 ))
1135 })?
1136 .collect::<Result<Vec<_>, _>>()?;
1137 Ok(rows)
1138}
1139
1140fn scan_generic_relations(
1142 conn: &Connection,
1143 namespace: &str,
1144 limit: Option<usize>,
1145) -> Result<Vec<(i64, String, String, String)>, AppError> {
1146 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1147 let sql = format!(
1148 "SELECT r.id, e1.name, e2.name, r.relation \
1149 FROM relationships r \
1150 JOIN entities e1 ON e1.id = r.source_id \
1151 JOIN entities e2 ON e2.id = r.target_id \
1152 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1153 ORDER BY r.id {limit_clause}"
1154 );
1155 let mut stmt = conn.prepare(&sql)?;
1156 let rows = stmt
1157 .query_map(rusqlite::params![namespace], |r| {
1158 Ok((
1159 r.get::<_, i64>(0)?,
1160 r.get::<_, String>(1)?,
1161 r.get::<_, String>(2)?,
1162 r.get::<_, String>(3)?,
1163 ))
1164 })?
1165 .collect::<Result<Vec<_>, _>>()?;
1166 Ok(rows)
1167}
1168
1169fn persist_memory_bindings(
1178 conn: &Connection,
1179 namespace: &str,
1180 memory_id: i64,
1181 entities_json: &serde_json::Value,
1182 rels_json: &serde_json::Value,
1183) -> Result<(usize, usize), AppError> {
1184 #[derive(Deserialize)]
1185 struct EntityItem {
1186 name: String,
1187 entity_type: String,
1188 }
1189 #[derive(Deserialize)]
1190 struct RelItem {
1191 source: String,
1192 target: String,
1193 relation: String,
1194 strength: f64,
1195 }
1196
1197 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1198 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1199
1200 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1201 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1202
1203 let mut ent_count = 0usize;
1204 let mut rel_count = 0usize;
1205
1206 for item in &extracted_entities {
1207 let entity_type = match item.entity_type.parse::<EntityType>() {
1208 Ok(et) => et,
1209 Err(_) => {
1210 tracing::warn!(
1211 target: "enrich",
1212 entity = %item.name,
1213 entity_type = %item.entity_type,
1214 "entity type not recognized, skipping"
1215 );
1216 continue;
1217 }
1218 };
1219 match entities::upsert_entity(
1220 conn,
1221 namespace,
1222 &NewEntity {
1223 name: item.name.clone(),
1224 entity_type,
1225 description: None,
1226 },
1227 ) {
1228 Ok(eid) => {
1229 let _ = entities::link_memory_entity(conn, memory_id, eid);
1230 ent_count += 1;
1231 }
1232 Err(e) => {
1233 tracing::warn!(
1234 target: "enrich",
1235 entity = %item.name,
1236 error = %e,
1237 "entity upsert skipped"
1238 );
1239 }
1240 }
1241 }
1242
1243 for rel in &extracted_rels {
1244 let normalized = crate::parsers::normalize_relation(&rel.relation);
1245 crate::parsers::warn_if_non_canonical(&normalized);
1246
1247 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1250 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1251 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1252 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1253 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1254 let new_rel = NewRelationship {
1255 source: rel.source.clone(),
1256 target: rel.target.clone(),
1257 relation: normalized,
1258 strength: rel.strength,
1259 description: None,
1260 };
1261 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1262 rel_count += 1;
1263 }
1264 }
1265 }
1266
1267 Ok((ent_count, rel_count))
1268}
1269
1270fn persist_entity_description(
1272 conn: &Connection,
1273 entity_id: i64,
1274 description: &str,
1275) -> Result<(), AppError> {
1276 conn.execute(
1277 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1278 rusqlite::params![description, entity_id],
1279 )?;
1280 Ok(())
1281}
1282
1283#[allow(clippy::too_many_arguments)]
1289fn reembed_memory_vector(
1290 conn: &Connection,
1291 namespace: &str,
1292 memory_id: i64,
1293 memory_name: &str,
1294 memory_type: &str,
1295 body: &str,
1296 paths: &crate::paths::AppPaths,
1297 llm_backend: crate::cli::LlmBackendChoice,
1298) -> Result<(), AppError> {
1299 let snippet: String = body.chars().take(200).collect();
1300 let (embedding, backend_kind) =
1305 crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1306 record_enrich_backend(backend_kind.as_str());
1307 memories::upsert_vec(
1308 conn,
1309 memory_id,
1310 namespace,
1311 memory_type,
1312 &embedding,
1313 memory_name,
1314 &snippet,
1315 )?;
1316 Ok(())
1317}
1318
1319fn record_enrich_backend(backend: &'static str) {
1325 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1326 *guard = Some(backend);
1327 }
1328}
1329
1330fn take_enrich_backend() -> Option<&'static str> {
1331 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1332}
1333
1334static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1335
1336fn persist_enriched_body(
1341 conn: &Connection,
1342 namespace: &str,
1343 memory_id: i64,
1344 memory_name: &str,
1345 new_body: &str,
1346 paths: &crate::paths::AppPaths,
1347 llm_backend: crate::cli::LlmBackendChoice,
1348) -> Result<(), AppError> {
1349 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1351 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1352 rusqlite::params![memory_id],
1353 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1354 )?;
1355
1356 let memory_type: String = conn.query_row(
1357 "SELECT type FROM memories WHERE id=?1",
1358 rusqlite::params![memory_id],
1359 |r| r.get(0),
1360 )?;
1361
1362 let description: String = conn.query_row(
1363 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1364 rusqlite::params![memory_id],
1365 |r| r.get(0),
1366 )?;
1367
1368 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1369
1370 let new_memory = memories::NewMemory {
1371 namespace: namespace.to_string(),
1372 name: memory_name.to_string(),
1373 memory_type: memory_type.clone(),
1374 description: description.clone(),
1375 body: new_body.to_string(),
1376 body_hash,
1377 session_id: None,
1378 source: "agent".to_string(),
1379 metadata: serde_json::json!({
1380 "operation": "body-enrich",
1381 "orig_chars": old_body.chars().count(),
1382 "new_chars": new_body.chars().count(),
1383 }),
1384 };
1385
1386 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1390 let version_metadata = serde_json::json!({
1391 "operation": "body-enrich",
1392 "orig_chars": old_body.chars().count(),
1393 "new_chars": new_body.chars().count(),
1394 })
1395 .to_string();
1396 crate::storage::versions::insert_version(
1397 conn,
1398 memory_id,
1399 next_version,
1400 memory_name,
1401 &memory_type,
1402 &description,
1403 new_body,
1404 &version_metadata,
1405 Some("enrich"),
1406 "edit",
1407 )?;
1408
1409 memories::update(conn, memory_id, &new_memory, None)?;
1410 memories::sync_fts_after_update(
1411 conn,
1412 memory_id,
1413 &old_name,
1414 &old_desc,
1415 &old_body,
1416 &new_memory.name,
1417 &new_memory.description,
1418 &new_memory.body,
1419 )?;
1420
1421 if let Err(e) = reembed_memory_vector(
1423 conn,
1424 namespace,
1425 memory_id,
1426 memory_name,
1427 &memory_type,
1428 new_body,
1429 paths,
1430 llm_backend,
1431 ) {
1432 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1433 }
1434
1435 Ok(())
1436}
1437
1438fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1450 value == default
1451}
1452
1453fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1468 const DEFAULT_TIMEOUT: u64 = 300;
1469
1470 let mut conflicts: Vec<String> = Vec::new();
1471
1472 match args.mode {
1473 EnrichMode::ClaudeCode => {
1474 if args.codex_binary.is_some() {
1475 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1476 }
1477 if args.codex_model.is_some() {
1478 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1479 }
1480 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1481 conflicts.push(format!(
1482 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1483 args.codex_timeout
1484 ));
1485 }
1486 }
1487 EnrichMode::Codex => {
1488 if args.claude_binary.is_some() {
1489 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1490 }
1491 if args.claude_model.is_some() {
1492 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1493 }
1494 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1495 conflicts.push(format!(
1496 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1497 args.claude_timeout
1498 ));
1499 }
1500 if args.max_cost_usd.is_some() {
1501 conflicts.push(
1502 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1503 .to_string(),
1504 );
1505 }
1506 }
1507 }
1508
1509 if !conflicts.is_empty() {
1510 return Err(AppError::Validation(format!(
1511 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1512 args.mode,
1513 conflicts.join("\n - ")
1514 )));
1515 }
1516
1517 Ok(())
1518}
1519
1520pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1524 validate_mode_conditional_flags_enrich(args)?;
1527 let started = Instant::now();
1528
1529 let paths = AppPaths::resolve(args.db.as_deref())?;
1530 ensure_db_ready(&paths)?;
1531 let conn = open_rw(&paths.db)?;
1532 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1533
1534 let wait_secs = args.wait_job_singleton;
1540 let force_flag = args.force_job_singleton;
1541 let _singleton = crate::lock::acquire_job_singleton(
1542 crate::lock::JobType::Enrich,
1543 &namespace,
1544 &paths.db,
1545 wait_secs,
1546 force_flag,
1547 )?;
1548
1549 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1551 None
1552 } else {
1553 Some(match args.mode {
1554 EnrichMode::ClaudeCode => {
1555 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1556 let version = super::claude_runner::validate_claude_version(&bin)?;
1557 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1558 emit_json(&PhaseEvent {
1559 phase: "validate",
1560 binary_path: bin.to_str(),
1561 version: Some(&version),
1562 items_total: None,
1563 items_pending: None,
1564 llm_parallelism: None,
1565 });
1566 bin
1567 }
1568 EnrichMode::Codex => {
1569 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1570 emit_json(&PhaseEvent {
1571 phase: "validate",
1572 binary_path: bin.to_str(),
1573 version: None,
1574 items_total: None,
1575 items_pending: None,
1576 llm_parallelism: None,
1577 });
1578 bin
1579 }
1580 })
1581 };
1582
1583 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1587 let load = crate::system_load::load_average_one();
1588 let n = crate::system_load::ncpus();
1589 return Err(AppError::Validation(format!(
1590 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1591 pass --no-max-load-check to override (not recommended)"
1592 )));
1593 }
1594
1595 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1602 {
1603 let preflight_result = run_preflight_probe(args);
1604 match preflight_result {
1605 PreflightOutcome::Healthy => {
1606 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1607 }
1608 PreflightOutcome::RateLimited { reason, suggestion } => {
1609 if let Some(fallback) = args.fallback_mode.clone() {
1610 if fallback != args.mode {
1611 return Err(AppError::Validation(format!(
1621 "preflight detected rate limit on {mode:?}: {reason}; \
1622 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1623 mode = args.mode
1624 )));
1625 }
1626 return Err(AppError::Validation(format!(
1627 "preflight detected rate limit on {mode:?}: {reason}; \
1628 --fallback-mode matches --mode, no recovery possible",
1629 mode = args.mode
1630 )));
1631 }
1632 return Err(AppError::Validation(format!(
1633 "preflight detected rate limit on {mode:?}: {reason}; \
1634 {suggestion}; pass --fallback-mode codex to recover",
1635 mode = args.mode
1636 )));
1637 }
1638 PreflightOutcome::Error(e) => {
1639 return Err(e);
1640 }
1641 }
1642 }
1643
1644 let scan_result = scan_operation(&conn, &namespace, args)?;
1646 let total = scan_result.len();
1647
1648 emit_json(&PhaseEvent {
1649 phase: "scan",
1650 binary_path: None,
1651 version: None,
1652 items_total: Some(total),
1653 items_pending: Some(total),
1654 llm_parallelism: Some(args.llm_parallelism),
1655 });
1656
1657 if args.dry_run {
1659 for (idx, key) in scan_result.iter().enumerate() {
1660 emit_json(&ItemEvent {
1661 item: key,
1662 status: "preview",
1663 memory_id: None,
1664 entity_id: None,
1665 entities: None,
1666 rels: None,
1667 chars_before: None,
1668 chars_after: None,
1669 cost_usd: None,
1670 elapsed_ms: None,
1671 error: None,
1672 index: idx,
1673 total,
1674 });
1675 }
1676 emit_json(&EnrichSummary {
1677 summary: true,
1678 operation: format!("{:?}", args.operation),
1679 items_total: total,
1680 completed: 0,
1681 failed: 0,
1682 skipped: 0,
1683 cost_usd: 0.0,
1684 elapsed_ms: started.elapsed().as_millis() as u64,
1685 backend_invoked: take_enrich_backend(),
1686 });
1687 return Ok(());
1688 }
1689
1690 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1694
1695 if args.resume {
1696 let reset = queue_conn
1697 .execute(
1698 "UPDATE queue SET status='pending' WHERE status='processing'",
1699 [],
1700 )
1701 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1702 if reset > 0 {
1703 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1704 }
1705 }
1706
1707 if args.retry_failed {
1708 let count = queue_conn
1709 .execute(
1710 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1711 [],
1712 )
1713 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1714 tracing::info!(target: "enrich", count, "retrying failed items");
1715 }
1716
1717 if !args.resume && !args.retry_failed {
1718 queue_conn
1719 .execute("DELETE FROM queue", [])
1720 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1721 }
1722
1723 for (idx, key) in scan_result.iter().enumerate() {
1725 let item_type = match args.operation {
1726 EnrichOperation::EntityDescriptions => "entity",
1727 _ => "memory",
1728 };
1729 if let Err(e) = queue_conn.execute(
1730 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1731 rusqlite::params![key, item_type],
1732 ) {
1733 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1734 }
1735 let _ = idx; }
1737
1738 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1741 if parallelism > 1 {
1742 tracing::info!(
1743 target: "enrich",
1744 llm_parallelism = parallelism,
1745 "parallel LLM processing with bounded thread pool"
1746 );
1747 }
1748 if parallelism > 4 {
1752 match args.mode {
1753 EnrichMode::ClaudeCode => {
1754 tracing::warn!(
1755 target: "enrich",
1756 llm_parallelism = parallelism,
1757 recommended_max = 4,
1758 mode = "claude-code",
1759 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1760 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1761 to cut MCP children (G28-A)"
1762 );
1763 }
1764 EnrichMode::Codex if parallelism > 16 => {
1765 tracing::warn!(
1766 target: "enrich",
1767 llm_parallelism = parallelism,
1768 recommended_max = 16,
1769 mode = "codex",
1770 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1771 consider --llm-parallelism 8 for safer concurrency"
1772 );
1773 }
1774 EnrichMode::Codex => {
1775 }
1779 }
1780 }
1781
1782 let mut completed = 0usize;
1783 let mut failed = 0usize;
1784 let mut skipped = 0usize;
1785 let mut cost_total = 0.0f64;
1786 let mut oauth_detected = false;
1787 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1788 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1789 let enrich_started = std::time::Instant::now();
1790
1791 let provider_timeout = match args.mode {
1792 EnrichMode::ClaudeCode => args.claude_timeout,
1793 EnrichMode::Codex => args.codex_timeout,
1794 };
1795
1796 let provider_model: Option<&str> = match args.mode {
1797 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1798 EnrichMode::Codex => args.codex_model.as_deref(),
1799 };
1800
1801 if parallelism > 1 {
1805 let stdout_mu = parking_lot::Mutex::new(());
1806 let budget = args.max_cost_usd;
1807 let operation = args.operation.clone();
1808 let mode = args.mode.clone();
1809 let min_oc = args.min_output_chars;
1810 let max_oc = args.max_output_chars;
1811 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1812
1813 struct WorkerResult {
1814 completed: usize,
1815 failed: usize,
1816 skipped: usize,
1817 cost: f64,
1818 oauth: bool,
1819 }
1820
1821 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1822 let handles: Vec<_> = (0..parallelism)
1823 .map(|worker_id| {
1824 let stdout_mu = &stdout_mu;
1825 let paths = &paths;
1826 let namespace = &namespace;
1827 let provider_binary = provider_binary.as_deref();
1828 let operation = &operation;
1829 let mode = &mode;
1830 let prompt_tpl = prompt_tpl.as_deref();
1831 s.spawn(move || {
1832 let w_conn = match open_rw(&paths.db) {
1833 Ok(c) => c,
1834 Err(e) => {
1835 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1836 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1837 }
1838 };
1839 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1840 Ok(c) => c,
1841 Err(e) => {
1842 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1843 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1844 }
1845 };
1846 let mut w_completed = 0usize;
1847 let mut w_failed = 0usize;
1848 let mut w_skipped = 0usize;
1849 let mut w_cost = 0.0f64;
1850 let mut w_oauth = false;
1851 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1852 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1853 let mut w_breaker = crate::retry::CircuitBreaker::new(
1859 args.circuit_breaker_threshold.max(1),
1860 std::time::Duration::from_secs(60),
1861 );
1862
1863 loop {
1864 if crate::shutdown_requested() {
1865 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1866 break;
1867 }
1868 if let Some(b) = budget {
1869 if !w_oauth && w_cost >= b {
1870 break;
1871 }
1872 }
1873 let pending: Option<(i64, String, String)> = w_queue
1874 .query_row(
1875 "UPDATE queue SET status='processing', attempt=attempt+1 \
1876 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1877 RETURNING id, item_key, item_type",
1878 [],
1879 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1880 )
1881 .ok();
1882 let (queue_id, item_key, _item_type) = match pending {
1883 Some(p) => p,
1884 None => break,
1885 };
1886 let item_started = Instant::now();
1887 let current_index = w_completed + w_failed + w_skipped;
1888
1889 let call_result = match operation {
1890 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1891 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1892 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
1893 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
1894 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1895 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1896 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1897 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1898 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1899 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1900 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1901 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1902 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1903 };
1904
1905 match call_result {
1906 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1907 if is_oauth { w_oauth = true; }
1908 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1909 let _ = w_queue.execute(
1910 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1911 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1912 );
1913 w_completed += 1;
1914 if !is_oauth { w_cost += cost; }
1915 let _ = w_breaker
1917 .record(crate::retry::AttemptOutcome::Success);
1918 let _guard = stdout_mu.lock();
1919 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1920 }
1921 Ok(EnrichItemResult::Skipped { reason }) => {
1922 w_skipped += 1;
1923 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1924 let _guard = stdout_mu.lock();
1925 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1926 }
1927 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1928 w_skipped += 1;
1934 let reason = format!(
1935 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1936 );
1937 let _ = w_queue.execute(
1938 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1939 rusqlite::params![reason, queue_id],
1940 );
1941 let _guard = stdout_mu.lock();
1942 emit_json(&ItemEvent {
1943 item: &item_key,
1944 status: "preservation_failed",
1945 memory_id: None,
1946 entity_id: None,
1947 entities: None,
1948 rels: None,
1949 chars_before: Some(chars_before),
1950 chars_after: Some(chars_after),
1951 cost_usd: None,
1952 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1953 error: Some(reason),
1954 index: current_index,
1955 total,
1956 });
1957 }
1958 Err(e) => {
1959 let err_str = format!("{e}");
1960 if matches!(e, AppError::RateLimited { .. }) {
1961 if crate::retry::is_kill_switch_active() {
1962 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1963 } else if std::time::Instant::now() >= w_deadline {
1964 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1965 } else {
1966 let half = w_backoff / 2;
1967 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1968 let actual_wait = half + jitter;
1969 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1970 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1971 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1972 w_backoff = (w_backoff * 2).min(900);
1973 continue;
1974 }
1975 }
1976 w_failed += 1;
1977 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1978 let _guard = stdout_mu.lock();
1979 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1980 let breaker_opened = w_breaker
1982 .record(crate::retry::AttemptOutcome::HardFailure);
1983 if breaker_opened {
1984 tracing::error!(target: "enrich",
1985 consecutive_failures = w_breaker.consecutive_failures(),
1986 "circuit breaker opened — aborting worker"
1987 );
1988 break;
1989 }
1990 }
1991 }
1992 }
1993 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1994 })
1995 })
1996 .collect();
1997 handles
1998 .into_iter()
1999 .map(|h| {
2000 h.join().unwrap_or(WorkerResult {
2001 completed: 0,
2002 failed: 0,
2003 skipped: 0,
2004 cost: 0.0,
2005 oauth: false,
2006 })
2007 })
2008 .collect()
2009 });
2010
2011 for r in &results {
2012 completed += r.completed;
2013 failed += r.failed;
2014 skipped += r.skipped;
2015 cost_total += r.cost;
2016 if r.oauth && !oauth_detected {
2017 oauth_detected = true;
2018 }
2019 }
2020 } else {
2021 loop {
2023 if crate::shutdown_requested() {
2024 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2025 break;
2026 }
2027
2028 if let Some(budget) = args.max_cost_usd {
2030 if !oauth_detected && cost_total >= budget {
2031 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2032 break;
2033 }
2034 }
2035
2036 let pending: Option<(i64, String, String)> = queue_conn
2038 .query_row(
2039 "UPDATE queue SET status='processing', attempt=attempt+1 \
2040 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2041 RETURNING id, item_key, item_type",
2042 [],
2043 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2044 )
2045 .ok();
2046
2047 let (queue_id, item_key, item_type) = match pending {
2048 Some(p) => p,
2049 None => break,
2050 };
2051
2052 let item_started = Instant::now();
2053 let current_index = completed + failed + skipped;
2054
2055 let call_result = match args.operation {
2056 EnrichOperation::MemoryBindings => call_memory_bindings(
2057 &conn,
2058 &namespace,
2059 &item_key,
2060 provider_binary
2061 .as_deref()
2062 .expect("provider binary required"),
2063 provider_model,
2064 provider_timeout,
2065 &args.mode,
2066 ),
2067 EnrichOperation::EntityDescriptions => call_entity_description(
2068 &conn,
2069 &namespace,
2070 &item_key,
2071 provider_binary
2072 .as_deref()
2073 .expect("provider binary required"),
2074 provider_model,
2075 provider_timeout,
2076 &args.mode,
2077 ),
2078 EnrichOperation::BodyEnrich => call_body_enrich(
2079 &conn,
2080 &namespace,
2081 &item_key,
2082 provider_binary
2083 .as_deref()
2084 .expect("provider binary required"),
2085 provider_model,
2086 provider_timeout,
2087 &args.mode,
2088 args.min_output_chars,
2089 args.max_output_chars,
2090 args.prompt_template.as_deref(),
2091 args.preserve_threshold,
2092 &paths,
2093 llm_backend,
2094 ),
2095 EnrichOperation::ReEmbed => {
2096 call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2097 }
2098 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2099 &conn,
2100 &namespace,
2101 &item_key,
2102 provider_binary
2103 .as_deref()
2104 .expect("provider binary required"),
2105 provider_model,
2106 provider_timeout,
2107 &args.mode,
2108 ),
2109 EnrichOperation::RelationReclassify => call_relation_reclassify(
2110 &conn,
2111 &namespace,
2112 &item_key,
2113 provider_binary
2114 .as_deref()
2115 .expect("provider binary required"),
2116 provider_model,
2117 provider_timeout,
2118 &args.mode,
2119 ),
2120 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2121 call_entity_connect(
2122 &conn,
2123 &namespace,
2124 &item_key,
2125 provider_binary
2126 .as_deref()
2127 .expect("provider binary required"),
2128 provider_model,
2129 provider_timeout,
2130 &args.mode,
2131 )
2132 }
2133 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2134 &conn,
2135 &namespace,
2136 &item_key,
2137 provider_binary
2138 .as_deref()
2139 .expect("provider binary required"),
2140 provider_model,
2141 provider_timeout,
2142 &args.mode,
2143 ),
2144 EnrichOperation::DescriptionEnrich => call_description_enrich(
2145 &conn,
2146 &namespace,
2147 &item_key,
2148 provider_binary
2149 .as_deref()
2150 .expect("provider binary required"),
2151 provider_model,
2152 provider_timeout,
2153 &args.mode,
2154 ),
2155 EnrichOperation::DomainClassify => call_domain_classify(
2156 &conn,
2157 &namespace,
2158 &item_key,
2159 provider_binary
2160 .as_deref()
2161 .expect("provider binary required"),
2162 provider_model,
2163 provider_timeout,
2164 &args.mode,
2165 ),
2166 EnrichOperation::GraphAudit => call_graph_audit(
2167 &conn,
2168 &namespace,
2169 &item_key,
2170 provider_binary
2171 .as_deref()
2172 .expect("provider binary required"),
2173 provider_model,
2174 provider_timeout,
2175 &args.mode,
2176 ),
2177 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2178 &conn,
2179 &namespace,
2180 &item_key,
2181 provider_binary
2182 .as_deref()
2183 .expect("provider binary required"),
2184 provider_model,
2185 provider_timeout,
2186 &args.mode,
2187 ),
2188 EnrichOperation::BodyExtract => call_body_extract(
2189 &conn,
2190 &namespace,
2191 &item_key,
2192 provider_binary
2193 .as_deref()
2194 .expect("provider binary required"),
2195 provider_model,
2196 provider_timeout,
2197 &args.mode,
2198 ),
2199 };
2200
2201 match call_result {
2202 Ok(EnrichItemResult::Done {
2203 memory_id,
2204 entity_id,
2205 entities,
2206 rels,
2207 chars_before,
2208 chars_after,
2209 cost,
2210 is_oauth,
2211 }) => {
2212 if is_oauth && !oauth_detected {
2213 oauth_detected = true;
2214 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2215 }
2216 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2217
2218 let persist_err: Option<String> = match args.operation {
2220 EnrichOperation::MemoryBindings => {
2221 None
2223 }
2224 EnrichOperation::EntityDescriptions => {
2225 None
2227 }
2228 EnrichOperation::BodyEnrich => {
2229 None
2231 }
2232 _ => {
2233 None
2235 }
2236 };
2237
2238 if let Err(e) = queue_conn.execute(
2239 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2240 rusqlite::params![
2241 memory_id,
2242 entity_id,
2243 entities as i64,
2244 rels as i64,
2245 cost,
2246 item_started.elapsed().as_millis() as i64,
2247 queue_id
2248 ],
2249 ) {
2250 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2251 }
2252
2253 if persist_err.is_none() {
2254 completed += 1;
2255 if !is_oauth {
2256 cost_total += cost;
2257 }
2258 emit_json(&ItemEvent {
2259 item: &item_key,
2260 status: "done",
2261 memory_id,
2262 entity_id,
2263 entities: Some(entities),
2264 rels: Some(rels),
2265 chars_before,
2266 chars_after,
2267 cost_usd: if is_oauth { None } else { Some(cost) },
2268 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2269 error: None,
2270 index: current_index,
2271 total,
2272 });
2273 } else {
2274 failed += 1;
2275 emit_json(&ItemEvent {
2276 item: &item_key,
2277 status: "failed",
2278 memory_id: None,
2279 entity_id: None,
2280 entities: None,
2281 rels: None,
2282 chars_before: None,
2283 chars_after: None,
2284 cost_usd: None,
2285 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2286 error: persist_err,
2287 index: current_index,
2288 total,
2289 });
2290 }
2291 }
2292 Ok(EnrichItemResult::Skipped { reason }) => {
2293 skipped += 1;
2294 if let Err(e) = queue_conn.execute(
2295 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2296 rusqlite::params![reason, queue_id],
2297 ) {
2298 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2299 }
2300 emit_json(&ItemEvent {
2301 item: &item_key,
2302 status: "skipped",
2303 memory_id: None,
2304 entity_id: None,
2305 entities: None,
2306 rels: None,
2307 chars_before: None,
2308 chars_after: None,
2309 cost_usd: None,
2310 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2311 error: None,
2312 index: current_index,
2313 total,
2314 });
2315 }
2316 Ok(EnrichItemResult::PreservationFailed {
2317 score,
2318 threshold,
2319 chars_before,
2320 chars_after,
2321 }) => {
2322 skipped += 1;
2329 let reason = format!(
2330 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2331 );
2332 if let Err(qe) = queue_conn.execute(
2333 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2334 rusqlite::params![reason, queue_id],
2335 ) {
2336 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2337 }
2338 emit_json(&ItemEvent {
2339 item: &item_key,
2340 status: "preservation_failed",
2341 memory_id: None,
2342 entity_id: None,
2343 entities: None,
2344 rels: None,
2345 chars_before: Some(chars_before),
2346 chars_after: Some(chars_after),
2347 cost_usd: None,
2348 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2349 error: Some(reason),
2350 index: current_index,
2351 total,
2352 });
2353 }
2354 Err(e) => {
2355 let err_str = format!("{e}");
2356 if matches!(e, AppError::RateLimited { .. }) {
2357 if crate::retry::is_kill_switch_active() {
2358 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2359 } else if std::time::Instant::now() >= rate_limit_deadline {
2360 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2361 } else {
2362 let half = backoff_secs / 2;
2363 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2364 let actual_wait = half + jitter;
2365 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2366 if let Err(qe) = queue_conn.execute(
2367 "UPDATE queue SET status='pending' WHERE id=?1",
2368 rusqlite::params![queue_id],
2369 ) {
2370 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2371 }
2372 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2373 backoff_secs = (backoff_secs * 2).min(900);
2374 continue;
2375 }
2376 }
2377
2378 failed += 1;
2379 if let Err(qe) = queue_conn.execute(
2380 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2381 rusqlite::params![err_str, queue_id],
2382 ) {
2383 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2384 }
2385 emit_json(&ItemEvent {
2386 item: &item_key,
2387 status: "failed",
2388 memory_id: None,
2389 entity_id: None,
2390 entities: None,
2391 rels: None,
2392 chars_before: None,
2393 chars_after: None,
2394 cost_usd: None,
2395 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2396 error: Some(err_str),
2397 index: current_index,
2398 total,
2399 });
2400 }
2401 }
2402
2403 let _ = item_type; }
2405 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2408 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2409
2410 emit_json(&EnrichSummary {
2411 summary: true,
2412 operation: format!("{:?}", args.operation),
2413 items_total: total,
2414 completed,
2415 failed,
2416 skipped,
2417 cost_usd: cost_total,
2418 elapsed_ms: started.elapsed().as_millis() as u64,
2419 backend_invoked: take_enrich_backend(),
2420 });
2421
2422 if failed == 0 {
2423 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2424 }
2425
2426 Ok(())
2427}
2428
2429enum EnrichItemResult {
2434 Done {
2435 memory_id: Option<i64>,
2436 entity_id: Option<i64>,
2437 entities: usize,
2438 rels: usize,
2439 chars_before: Option<usize>,
2440 chars_after: Option<usize>,
2441 cost: f64,
2442 is_oauth: bool,
2443 },
2444 Skipped {
2445 reason: String,
2446 },
2447 PreservationFailed {
2452 score: f64,
2453 threshold: f64,
2454 chars_before: usize,
2455 chars_after: usize,
2456 },
2457}
2458
2459fn call_memory_bindings(
2464 conn: &Connection,
2465 namespace: &str,
2466 memory_name: &str,
2467 binary: &Path,
2468 model: Option<&str>,
2469 timeout: u64,
2470 mode: &EnrichMode,
2471) -> Result<EnrichItemResult, AppError> {
2472 let (memory_id, body): (i64, String) = conn.query_row(
2474 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2475 rusqlite::params![namespace, memory_name],
2476 |r| Ok((r.get(0)?, r.get(1)?)),
2477 ).map_err(|e| match e {
2478 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2479 other => AppError::Database(other),
2480 })?;
2481
2482 if body.trim().is_empty() {
2483 return Ok(EnrichItemResult::Skipped {
2484 reason: "body is empty".to_string(),
2485 });
2486 }
2487
2488 let (value, cost, is_oauth) = match mode {
2489 EnrichMode::ClaudeCode => call_claude(
2490 binary,
2491 BINDINGS_PROMPT,
2492 BINDINGS_SCHEMA,
2493 &body,
2494 model,
2495 timeout,
2496 )?,
2497 EnrichMode::Codex => call_codex(
2498 binary,
2499 BINDINGS_PROMPT,
2500 BINDINGS_SCHEMA,
2501 &body,
2502 model,
2503 timeout,
2504 )?,
2505 };
2506
2507 let empty_arr = serde_json::Value::Array(vec![]);
2508 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2509 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2510
2511 let (ent_count, rel_count) =
2512 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2513
2514 Ok(EnrichItemResult::Done {
2515 memory_id: Some(memory_id),
2516 entity_id: None,
2517 entities: ent_count,
2518 rels: rel_count,
2519 chars_before: None,
2520 chars_after: None,
2521 cost,
2522 is_oauth,
2523 })
2524}
2525
2526fn call_entity_description(
2527 conn: &Connection,
2528 namespace: &str,
2529 entity_name: &str,
2530 binary: &Path,
2531 model: Option<&str>,
2532 timeout: u64,
2533 mode: &EnrichMode,
2534) -> Result<EnrichItemResult, AppError> {
2535 let (entity_id, entity_type): (i64, String) = conn
2536 .query_row(
2537 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2538 rusqlite::params![namespace, entity_name],
2539 |r| Ok((r.get(0)?, r.get(1)?)),
2540 )
2541 .map_err(|e| match e {
2542 rusqlite::Error::QueryReturnedNoRows => {
2543 AppError::NotFound(format!("entity '{entity_name}' not found"))
2544 }
2545 other => AppError::Database(other),
2546 })?;
2547
2548 let prompt = format!(
2549 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2550 );
2551
2552 let (value, cost, is_oauth) = match mode {
2553 EnrichMode::ClaudeCode => call_claude(
2554 binary,
2555 &prompt,
2556 ENTITY_DESCRIPTION_SCHEMA,
2557 "",
2558 model,
2559 timeout,
2560 )?,
2561 EnrichMode::Codex => call_codex(
2562 binary,
2563 &prompt,
2564 ENTITY_DESCRIPTION_SCHEMA,
2565 "",
2566 model,
2567 timeout,
2568 )?,
2569 };
2570
2571 let description = value
2572 .get("description")
2573 .and_then(|v| v.as_str())
2574 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2575
2576 persist_entity_description(conn, entity_id, description)?;
2577
2578 Ok(EnrichItemResult::Done {
2579 memory_id: None,
2580 entity_id: Some(entity_id),
2581 entities: 0,
2582 rels: 0,
2583 chars_before: None,
2584 chars_after: None,
2585 cost,
2586 is_oauth,
2587 })
2588}
2589
2590#[allow(clippy::too_many_arguments)]
2591fn call_body_enrich(
2592 conn: &Connection,
2593 namespace: &str,
2594 memory_name: &str,
2595 binary: &Path,
2596 model: Option<&str>,
2597 timeout: u64,
2598 mode: &EnrichMode,
2599 min_output_chars: usize,
2600 max_output_chars: usize,
2601 prompt_template: Option<&Path>,
2602 preserve_threshold: f64,
2603 paths: &crate::paths::AppPaths,
2604 llm_backend: crate::cli::LlmBackendChoice,
2605) -> Result<EnrichItemResult, AppError> {
2606 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2607 .query_row(
2608 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2609 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2610 rusqlite::params![namespace, memory_name],
2611 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2612 )
2613 .map_err(|e| match e {
2614 rusqlite::Error::QueryReturnedNoRows => {
2615 AppError::NotFound(format!("memory '{memory_name}' not found"))
2616 }
2617 other => AppError::Database(other),
2618 })?;
2619
2620 let chars_before = body.chars().count();
2621
2622 let linked_entities: Vec<String> = {
2624 let mut stmt = conn.prepare_cached(
2625 "SELECT e.name FROM memory_entities me \
2626 JOIN entities e ON e.id = me.entity_id \
2627 WHERE me.memory_id = ?1 LIMIT 10",
2628 )?;
2629 let result: Vec<String> = stmt
2630 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2631 .filter_map(|r| r.ok())
2632 .collect();
2633 drop(stmt);
2634 result
2635 };
2636
2637 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2639 let file_size = std::fs::metadata(tmpl_path)
2640 .map_err(|e| {
2641 AppError::Io(std::io::Error::new(
2642 e.kind(),
2643 format!("failed to stat prompt template: {e}"),
2644 ))
2645 })?
2646 .len();
2647 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2648 return Err(AppError::LimitExceeded(
2649 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2650 ));
2651 }
2652 std::fs::read_to_string(tmpl_path).map_err(|e| {
2653 AppError::Io(std::io::Error::new(
2654 e.kind(),
2655 format!("failed to read prompt template: {e}"),
2656 ))
2657 })?
2658 } else {
2659 BODY_ENRICH_PROMPT_PREFIX.to_string()
2660 };
2661
2662 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2664 let mut ctx = String::new();
2665 ctx.push_str(&format!(
2666 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2667 ));
2668 if !description.is_empty() {
2669 ctx.push_str(&format!("- Description: {description}\n"));
2670 }
2671 ctx.push_str(&format!("- Domain: {namespace}\n"));
2672 if !linked_entities.is_empty() {
2673 ctx.push_str(&format!(
2674 "- Linked entities: {}\n",
2675 linked_entities.join(", ")
2676 ));
2677 }
2678 ctx
2679 } else {
2680 String::new()
2681 };
2682
2683 let prompt = format!(
2684 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2685 );
2686
2687 let (value, cost, is_oauth) = match mode {
2689 EnrichMode::ClaudeCode => {
2690 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2691 }
2692 EnrichMode::Codex => {
2693 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2694 }
2695 };
2696
2697 let enriched_body = value
2698 .get("enriched_body")
2699 .and_then(|v| v.as_str())
2700 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2701
2702 let chars_after = enriched_body.chars().count();
2703
2704 let threshold = preserve_threshold;
2711 let verdict =
2712 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2713 if !verdict.is_accepted() {
2714 return Ok(EnrichItemResult::PreservationFailed {
2715 score: match verdict {
2716 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2717 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2718 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2719 },
2720 threshold,
2721 chars_before,
2722 chars_after,
2723 });
2724 }
2725
2726 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2732 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2733 if old_hash == new_hash {
2734 return Ok(EnrichItemResult::Skipped {
2735 reason: format!(
2736 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2737 ),
2738 });
2739 }
2740
2741 if chars_after <= chars_before {
2743 return Ok(EnrichItemResult::Skipped {
2744 reason: format!(
2745 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2746 ),
2747 });
2748 }
2749
2750 persist_enriched_body(
2751 conn,
2752 namespace,
2753 memory_id,
2754 memory_name,
2755 enriched_body,
2756 paths,
2757 llm_backend,
2758 )?;
2759
2760 Ok(EnrichItemResult::Done {
2761 memory_id: Some(memory_id),
2762 entity_id: None,
2763 entities: 0,
2764 rels: 0,
2765 chars_before: Some(chars_before),
2766 chars_after: Some(chars_after),
2767 cost,
2768 is_oauth,
2769 })
2770}
2771
2772fn call_reembed(
2773 conn: &Connection,
2774 namespace: &str,
2775 memory_name: &str,
2776 paths: &crate::paths::AppPaths,
2777 llm_backend: crate::cli::LlmBackendChoice,
2778) -> Result<EnrichItemResult, AppError> {
2779 let (memory_id, body, memory_type): (i64, String, String) = conn
2780 .query_row(
2781 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2782 FROM memories
2783 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2784 rusqlite::params![namespace, memory_name],
2785 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2786 )
2787 .map_err(|e| match e {
2788 rusqlite::Error::QueryReturnedNoRows => {
2789 AppError::NotFound(format!("memory '{memory_name}' not found"))
2790 }
2791 other => AppError::Database(other),
2792 })?;
2793
2794 if body.trim().is_empty() {
2795 return Ok(EnrichItemResult::Skipped {
2796 reason: "body is empty".to_string(),
2797 });
2798 }
2799
2800 reembed_memory_vector(
2801 conn,
2802 namespace,
2803 memory_id,
2804 memory_name,
2805 &memory_type,
2806 &body,
2807 paths,
2808 llm_backend,
2809 )?;
2810
2811 Ok(EnrichItemResult::Done {
2812 memory_id: Some(memory_id),
2813 entity_id: None,
2814 entities: 0,
2815 rels: 0,
2816 chars_before: Some(body.chars().count()),
2817 chars_after: Some(body.chars().count()),
2818 cost: 0.0,
2819 is_oauth: true,
2820 })
2821}
2822
2823fn scan_operation(
2828 conn: &Connection,
2829 namespace: &str,
2830 args: &EnrichArgs,
2831) -> Result<Vec<String>, AppError> {
2832 let name_filter = resolve_name_filter(args)?;
2834 match args.operation {
2835 EnrichOperation::MemoryBindings => {
2836 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2837 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2838 }
2839 EnrichOperation::EntityDescriptions => {
2840 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2841 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2842 }
2843 EnrichOperation::BodyEnrich => {
2844 let rows =
2845 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2846 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2847 }
2848 EnrichOperation::ReEmbed => {
2849 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2850 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2851 }
2852 EnrichOperation::WeightCalibrate => {
2853 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2854 Ok(rows
2855 .into_iter()
2856 .map(|(id, _, _, _, _)| id.to_string())
2857 .collect())
2858 }
2859 EnrichOperation::RelationReclassify => {
2860 let rows = scan_generic_relations(conn, namespace, args.limit)?;
2861 Ok(rows
2862 .into_iter()
2863 .map(|(id, _, _, _)| id.to_string())
2864 .collect())
2865 }
2866 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2867 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2868 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2869 }
2870 EnrichOperation::EntityTypeValidate => {
2871 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2872 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2873 }
2874 EnrichOperation::DescriptionEnrich => {
2875 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2876 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2877 }
2878 EnrichOperation::DomainClassify
2879 | EnrichOperation::GraphAudit
2880 | EnrichOperation::DeepResearchSynth
2881 | EnrichOperation::BodyExtract => {
2882 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2883 let sql = format!(
2884 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2885 );
2886 let mut stmt = conn.prepare(&sql)?;
2887 let names = stmt
2888 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2889 .collect::<Result<Vec<_>, _>>()?;
2890 Ok(names)
2891 }
2892 }
2893}
2894
2895fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2901 if let Some(p) = explicit {
2902 if p.exists() {
2903 return Ok(p.to_path_buf());
2904 }
2905 return Err(AppError::Validation(format!(
2906 "Codex binary not found at explicit path: {}",
2907 p.display()
2908 )));
2909 }
2910
2911 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2912 let p = PathBuf::from(&env_path);
2913 if p.exists() {
2914 return Ok(p);
2915 }
2916 }
2917
2918 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2919 if let Some(path_var) = std::env::var_os("PATH") {
2920 for dir in std::env::split_paths(&path_var) {
2921 let candidate = dir.join(name);
2922 if candidate.exists() {
2923 return Ok(crate::extract::llm_embedding::resolve_real_binary(
2924 &candidate,
2925 ));
2926 }
2927 }
2928 }
2929
2930 Err(AppError::Validation(
2931 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2932 ))
2933}
2934
2935fn call_weight_calibrate(
2937 conn: &Connection,
2938 _namespace: &str,
2939 item_key: &str,
2940 binary: &Path,
2941 model: Option<&str>,
2942 timeout: u64,
2943 mode: &EnrichMode,
2944) -> Result<EnrichItemResult, AppError> {
2945 let rel_id: i64 = item_key
2946 .parse()
2947 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2948 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2949 .query_row(
2950 "SELECT e1.name, e2.name, r.relation, r.weight \
2951 FROM relationships r \
2952 JOIN entities e1 ON e1.id = r.source_id \
2953 JOIN entities e2 ON e2.id = r.target_id \
2954 WHERE r.id = ?1",
2955 rusqlite::params![rel_id],
2956 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2957 )
2958 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2959
2960 let input_text = format!(
2961 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2962 );
2963 let (value, cost, is_oauth) = match mode {
2964 EnrichMode::ClaudeCode => call_claude(
2965 binary,
2966 WEIGHT_CALIBRATE_PROMPT,
2967 WEIGHT_CALIBRATE_SCHEMA,
2968 &input_text,
2969 model,
2970 timeout,
2971 )?,
2972 EnrichMode::Codex => call_codex(
2973 binary,
2974 WEIGHT_CALIBRATE_PROMPT,
2975 WEIGHT_CALIBRATE_SCHEMA,
2976 &input_text,
2977 model,
2978 timeout,
2979 )?,
2980 };
2981
2982 let calibrated = value
2983 .get("calibrated_weight")
2984 .and_then(|v| v.as_f64())
2985 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2986
2987 conn.execute(
2988 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2989 rusqlite::params![calibrated, rel_id],
2990 )?;
2991
2992 Ok(EnrichItemResult::Done {
2993 memory_id: None,
2994 entity_id: None,
2995 entities: 0,
2996 rels: 1,
2997 chars_before: None,
2998 chars_after: None,
2999 cost,
3000 is_oauth,
3001 })
3002}
3003
3004fn call_relation_reclassify(
3006 conn: &Connection,
3007 _namespace: &str,
3008 item_key: &str,
3009 binary: &Path,
3010 model: Option<&str>,
3011 timeout: u64,
3012 mode: &EnrichMode,
3013) -> Result<EnrichItemResult, AppError> {
3014 let rel_id: i64 = item_key
3015 .parse()
3016 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3017 let (source_name, target_name, current_relation): (String, String, String) = conn
3018 .query_row(
3019 "SELECT e1.name, e2.name, r.relation \
3020 FROM relationships r \
3021 JOIN entities e1 ON e1.id = r.source_id \
3022 JOIN entities e2 ON e2.id = r.target_id \
3023 WHERE r.id = ?1",
3024 rusqlite::params![rel_id],
3025 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3026 )
3027 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3028
3029 let input_text = format!(
3030 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3031 );
3032 let (value, cost, is_oauth) = match mode {
3033 EnrichMode::ClaudeCode => call_claude(
3034 binary,
3035 RELATION_RECLASSIFY_PROMPT,
3036 RELATION_RECLASSIFY_SCHEMA,
3037 &input_text,
3038 model,
3039 timeout,
3040 )?,
3041 EnrichMode::Codex => call_codex(
3042 binary,
3043 RELATION_RECLASSIFY_PROMPT,
3044 RELATION_RECLASSIFY_SCHEMA,
3045 &input_text,
3046 model,
3047 timeout,
3048 )?,
3049 };
3050
3051 let new_relation = value
3052 .get("relation")
3053 .and_then(|v| v.as_str())
3054 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3055 let new_strength = value
3056 .get("strength")
3057 .and_then(|v| v.as_f64())
3058 .unwrap_or(0.5);
3059
3060 conn.execute(
3061 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3062 rusqlite::params![new_relation, new_strength, rel_id],
3063 )?;
3064
3065 Ok(EnrichItemResult::Done {
3066 memory_id: None,
3067 entity_id: None,
3068 entities: 0,
3069 rels: 1,
3070 chars_before: None,
3071 chars_after: None,
3072 cost,
3073 is_oauth,
3074 })
3075}
3076
3077fn call_entity_connect(
3079 conn: &Connection,
3080 namespace: &str,
3081 item_key: &str,
3082 binary: &Path,
3083 model: Option<&str>,
3084 timeout: u64,
3085 mode: &EnrichMode,
3086) -> Result<EnrichItemResult, AppError> {
3087 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3088 let (e1_id, e1_name, e2_id, e2_name) =
3089 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3090 Some(p) => p,
3091 None => {
3092 return Ok(EnrichItemResult::Skipped {
3093 reason: "pair no longer isolated".into(),
3094 })
3095 }
3096 };
3097 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3098 let (value, cost, is_oauth) = match mode {
3099 EnrichMode::ClaudeCode => call_claude(
3100 binary,
3101 ENTITY_CONNECT_PROMPT,
3102 ENTITY_CONNECT_SCHEMA,
3103 &input_text,
3104 model,
3105 timeout,
3106 )?,
3107 EnrichMode::Codex => call_codex(
3108 binary,
3109 ENTITY_CONNECT_PROMPT,
3110 ENTITY_CONNECT_SCHEMA,
3111 &input_text,
3112 model,
3113 timeout,
3114 )?,
3115 };
3116 let relation = value
3117 .get("relation")
3118 .and_then(|v| v.as_str())
3119 .unwrap_or("none");
3120 if relation == "none" {
3121 return Ok(EnrichItemResult::Skipped {
3122 reason: "LLM determined no relationship".into(),
3123 });
3124 }
3125 let strength = value
3126 .get("strength")
3127 .and_then(|v| v.as_f64())
3128 .unwrap_or(0.5);
3129 conn.execute(
3130 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3131 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3132 )?;
3133 Ok(EnrichItemResult::Done {
3134 memory_id: None,
3135 entity_id: None,
3136 entities: 0,
3137 rels: 1,
3138 chars_before: None,
3139 chars_after: None,
3140 cost,
3141 is_oauth,
3142 })
3143}
3144
3145fn call_entity_type_validate(
3147 conn: &Connection,
3148 _namespace: &str,
3149 item_key: &str,
3150 binary: &Path,
3151 model: Option<&str>,
3152 timeout: u64,
3153 mode: &EnrichMode,
3154) -> Result<EnrichItemResult, AppError> {
3155 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3156 .query_row(
3157 "SELECT id, name, type FROM entities WHERE name = ?1",
3158 rusqlite::params![item_key],
3159 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3160 )
3161 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3162 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3163 let (value, cost, is_oauth) = match mode {
3164 EnrichMode::ClaudeCode => call_claude(
3165 binary,
3166 ENTITY_TYPE_VALIDATE_PROMPT,
3167 ENTITY_TYPE_VALIDATE_SCHEMA,
3168 &input_text,
3169 model,
3170 timeout,
3171 )?,
3172 EnrichMode::Codex => call_codex(
3173 binary,
3174 ENTITY_TYPE_VALIDATE_PROMPT,
3175 ENTITY_TYPE_VALIDATE_SCHEMA,
3176 &input_text,
3177 model,
3178 timeout,
3179 )?,
3180 };
3181 let validated_type = value
3182 .get("validated_type")
3183 .and_then(|v| v.as_str())
3184 .unwrap_or(&ent_type);
3185 let was_correct = value
3186 .get("was_correct")
3187 .and_then(|v| v.as_bool())
3188 .unwrap_or(true);
3189 if !was_correct {
3190 conn.execute(
3191 "UPDATE entities SET type = ?1 WHERE id = ?2",
3192 rusqlite::params![validated_type, ent_id],
3193 )?;
3194 }
3195 Ok(EnrichItemResult::Done {
3196 memory_id: None,
3197 entity_id: Some(ent_id),
3198 entities: 1,
3199 rels: 0,
3200 chars_before: None,
3201 chars_after: None,
3202 cost,
3203 is_oauth,
3204 })
3205}
3206
3207fn call_description_enrich(
3209 conn: &Connection,
3210 _namespace: &str,
3211 item_key: &str,
3212 binary: &Path,
3213 model: Option<&str>,
3214 timeout: u64,
3215 mode: &EnrichMode,
3216) -> Result<EnrichItemResult, AppError> {
3217 let (mem_id, body, old_desc): (i64, String, String) = conn
3218 .query_row(
3219 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3220 rusqlite::params![item_key],
3221 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3222 )
3223 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3224 let snippet: String = body.chars().take(500).collect();
3225 let input_text = format!(
3226 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3227 );
3228 let (value, cost, is_oauth) = match mode {
3229 EnrichMode::ClaudeCode => call_claude(
3230 binary,
3231 DESCRIPTION_ENRICH_PROMPT,
3232 DESCRIPTION_ENRICH_SCHEMA,
3233 &input_text,
3234 model,
3235 timeout,
3236 )?,
3237 EnrichMode::Codex => call_codex(
3238 binary,
3239 DESCRIPTION_ENRICH_PROMPT,
3240 DESCRIPTION_ENRICH_SCHEMA,
3241 &input_text,
3242 model,
3243 timeout,
3244 )?,
3245 };
3246 let new_desc = value
3247 .get("description")
3248 .and_then(|v| v.as_str())
3249 .unwrap_or(&old_desc);
3250 conn.execute(
3251 "UPDATE memories SET description = ?1 WHERE id = ?2",
3252 rusqlite::params![new_desc, mem_id],
3253 )?;
3254 Ok(EnrichItemResult::Done {
3255 memory_id: Some(mem_id),
3256 entity_id: None,
3257 entities: 0,
3258 rels: 0,
3259 chars_before: Some(old_desc.len()),
3260 chars_after: Some(new_desc.len()),
3261 cost,
3262 is_oauth,
3263 })
3264}
3265
3266fn call_domain_classify(
3268 conn: &Connection,
3269 _namespace: &str,
3270 item_key: &str,
3271 binary: &Path,
3272 model: Option<&str>,
3273 timeout: u64,
3274 mode: &EnrichMode,
3275) -> Result<EnrichItemResult, AppError> {
3276 let (mem_id, body, desc): (i64, String, String) = conn
3277 .query_row(
3278 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3279 rusqlite::params![item_key],
3280 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3281 )
3282 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3283 let snippet: String = body.chars().take(500).collect();
3284 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3285 let (value, cost, is_oauth) = match mode {
3286 EnrichMode::ClaudeCode => call_claude(
3287 binary,
3288 DOMAIN_CLASSIFY_PROMPT,
3289 DOMAIN_CLASSIFY_SCHEMA,
3290 &input_text,
3291 model,
3292 timeout,
3293 )?,
3294 EnrichMode::Codex => call_codex(
3295 binary,
3296 DOMAIN_CLASSIFY_PROMPT,
3297 DOMAIN_CLASSIFY_SCHEMA,
3298 &input_text,
3299 model,
3300 timeout,
3301 )?,
3302 };
3303 let domain = value
3304 .get("domain")
3305 .and_then(|v| v.as_str())
3306 .unwrap_or("uncategorized");
3307 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3308 conn.execute(
3309 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3310 rusqlite::params![metadata, mem_id],
3311 )?;
3312 Ok(EnrichItemResult::Done {
3313 memory_id: Some(mem_id),
3314 entity_id: None,
3315 entities: 0,
3316 rels: 0,
3317 chars_before: None,
3318 chars_after: None,
3319 cost,
3320 is_oauth,
3321 })
3322}
3323
3324fn call_graph_audit(
3326 conn: &Connection,
3327 _namespace: &str,
3328 item_key: &str,
3329 binary: &Path,
3330 model: Option<&str>,
3331 timeout: u64,
3332 mode: &EnrichMode,
3333) -> Result<EnrichItemResult, AppError> {
3334 let (mem_id, body, desc): (i64, String, String) = conn
3335 .query_row(
3336 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3337 rusqlite::params![item_key],
3338 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3339 )
3340 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3341 let snippet: String = body.chars().take(500).collect();
3342 let ent_count: i64 = conn
3343 .query_row(
3344 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3345 rusqlite::params![mem_id],
3346 |r| r.get(0),
3347 )
3348 .unwrap_or(0);
3349 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3350 let (value, cost, is_oauth) = match mode {
3351 EnrichMode::ClaudeCode => call_claude(
3352 binary,
3353 GRAPH_AUDIT_PROMPT,
3354 GRAPH_AUDIT_SCHEMA,
3355 &input_text,
3356 model,
3357 timeout,
3358 )?,
3359 EnrichMode::Codex => call_codex(
3360 binary,
3361 GRAPH_AUDIT_PROMPT,
3362 GRAPH_AUDIT_SCHEMA,
3363 &input_text,
3364 model,
3365 timeout,
3366 )?,
3367 };
3368 let issues = value
3369 .get("issues")
3370 .and_then(|v| v.as_array())
3371 .map(|a| a.len())
3372 .unwrap_or(0);
3373 Ok(EnrichItemResult::Done {
3374 memory_id: Some(mem_id),
3375 entity_id: None,
3376 entities: 0,
3377 rels: issues,
3378 chars_before: None,
3379 chars_after: None,
3380 cost,
3381 is_oauth,
3382 })
3383}
3384
3385fn call_deep_research_synth(
3387 conn: &Connection,
3388 namespace: &str,
3389 item_key: &str,
3390 binary: &Path,
3391 model: Option<&str>,
3392 timeout: u64,
3393 mode: &EnrichMode,
3394) -> Result<EnrichItemResult, AppError> {
3395 let (mem_id, body): (i64, String) = conn
3396 .query_row(
3397 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3398 rusqlite::params![item_key],
3399 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3400 )
3401 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3402 let snippet: String = body.chars().take(2000).collect();
3403 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3404 let (value, cost, is_oauth) = match mode {
3405 EnrichMode::ClaudeCode => call_claude(
3406 binary,
3407 DEEP_RESEARCH_SYNTH_PROMPT,
3408 DEEP_RESEARCH_SYNTH_SCHEMA,
3409 &input_text,
3410 model,
3411 timeout,
3412 )?,
3413 EnrichMode::Codex => call_codex(
3414 binary,
3415 DEEP_RESEARCH_SYNTH_PROMPT,
3416 DEEP_RESEARCH_SYNTH_SCHEMA,
3417 &input_text,
3418 model,
3419 timeout,
3420 )?,
3421 };
3422 let mut ent_count = 0usize;
3423 let mut rel_count = 0usize;
3424 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3425 for e in ents {
3426 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3427 let etype_str = e
3428 .get("entity_type")
3429 .and_then(|v| v.as_str())
3430 .unwrap_or("concept");
3431 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3432 if name.len() >= 2 {
3433 let ne = NewEntity {
3434 name: name.to_string(),
3435 entity_type: etype,
3436 description: None,
3437 };
3438 let _ = entities::upsert_entity(conn, namespace, &ne);
3439 ent_count += 1;
3440 }
3441 }
3442 }
3443 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3444 for r in rels {
3445 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3446 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3447 if src.is_empty() || tgt.is_empty() {
3448 continue;
3449 }
3450 let rel = r
3451 .get("relation")
3452 .and_then(|v| v.as_str())
3453 .unwrap_or("related");
3454 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3455 if let (Some(sid), Some(tid)) = (
3456 entities::find_entity_id(conn, namespace, src)?,
3457 entities::find_entity_id(conn, namespace, tgt)?,
3458 ) {
3459 let _ = entities::create_or_fetch_relationship(
3460 conn, namespace, sid, tid, rel, str_, None,
3461 );
3462 rel_count += 1;
3463 }
3464 }
3465 }
3466 Ok(EnrichItemResult::Done {
3467 memory_id: Some(mem_id),
3468 entity_id: None,
3469 entities: ent_count,
3470 rels: rel_count,
3471 chars_before: None,
3472 chars_after: None,
3473 cost,
3474 is_oauth,
3475 })
3476}
3477
3478fn call_body_extract(
3480 conn: &Connection,
3481 _namespace: &str,
3482 item_key: &str,
3483 binary: &Path,
3484 model: Option<&str>,
3485 timeout: u64,
3486 mode: &EnrichMode,
3487) -> Result<EnrichItemResult, AppError> {
3488 let (mem_id, body): (i64, String) = conn
3489 .query_row(
3490 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3491 rusqlite::params![item_key],
3492 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3493 )
3494 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3495 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3496 let (value, cost, is_oauth) = match mode {
3497 EnrichMode::ClaudeCode => call_claude(
3498 binary,
3499 BODY_EXTRACT_PROMPT,
3500 BODY_EXTRACT_SCHEMA,
3501 &input_text,
3502 model,
3503 timeout,
3504 )?,
3505 EnrichMode::Codex => call_codex(
3506 binary,
3507 BODY_EXTRACT_PROMPT,
3508 BODY_EXTRACT_SCHEMA,
3509 &input_text,
3510 model,
3511 timeout,
3512 )?,
3513 };
3514 let restructured = value
3515 .get("restructured_body")
3516 .and_then(|v| v.as_str())
3517 .unwrap_or(&body);
3518 let chars_before = body.len();
3519 let chars_after = restructured.len();
3520 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3521 conn.execute(
3522 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3523 rusqlite::params![restructured, new_hash, mem_id],
3524 )?;
3525 Ok(EnrichItemResult::Done {
3526 memory_id: Some(mem_id),
3527 entity_id: None,
3528 entities: 0,
3529 rels: 0,
3530 chars_before: Some(chars_before),
3531 chars_after: Some(chars_after),
3532 cost,
3533 is_oauth,
3534 })
3535}
3536
3537#[allow(clippy::type_complexity)]
3539fn scan_isolated_entity_pairs(
3540 conn: &Connection,
3541 namespace: &str,
3542 limit: Option<usize>,
3543) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3544 let limit_val = limit.unwrap_or(50) as i64;
3545 let mut stmt = conn.prepare_cached(
3546 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3547 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3548 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3549 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3550 (r.source_id = e2.id AND r.target_id = e1.id)) \
3551 LIMIT ?2",
3552 )?;
3553 let rows = stmt
3554 .query_map(rusqlite::params![namespace, limit_val], |r| {
3555 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3556 })?
3557 .collect::<Result<Vec<_>, _>>()?;
3558 Ok(rows)
3559}
3560
3561fn scan_entities_for_type_validation(
3563 conn: &Connection,
3564 namespace: &str,
3565 limit: Option<usize>,
3566) -> Result<Vec<(i64, String, String)>, AppError> {
3567 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3568 let sql = format!(
3569 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3570 );
3571 let mut stmt = conn.prepare(&sql)?;
3572 let rows = stmt
3573 .query_map(rusqlite::params![namespace], |r| {
3574 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3575 })?
3576 .collect::<Result<Vec<_>, _>>()?;
3577 Ok(rows)
3578}
3579
3580fn scan_generic_descriptions(
3582 conn: &Connection,
3583 namespace: &str,
3584 limit: Option<usize>,
3585) -> Result<Vec<(i64, String, String)>, AppError> {
3586 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3587 let sql = format!(
3588 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3589 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3590 ORDER BY id {limit_clause}"
3591 );
3592 let mut stmt = conn.prepare(&sql)?;
3593 let rows = stmt
3594 .query_map(rusqlite::params![namespace], |r| {
3595 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3596 })?
3597 .collect::<Result<Vec<_>, _>>()?;
3598 Ok(rows)
3599}
3600
3601fn call_codex(
3605 binary: &Path,
3606 prompt: &str,
3607 json_schema: &str,
3608 input_text: &str,
3609 model: Option<&str>,
3610 timeout_secs: u64,
3611) -> Result<(serde_json::Value, f64, bool), AppError> {
3612 use wait_timeout::ChildExt;
3613
3614 super::codex_spawn::validate_codex_model(model)?;
3619 let schema_file = super::codex_spawn::trusted_schema_path()?;
3620
3621 let args = super::codex_spawn::CodexSpawnArgs {
3622 binary,
3623 prompt,
3624 json_schema,
3625 input_text,
3626 model,
3627 timeout_secs,
3628 schema_path: schema_file.clone(),
3629 };
3630 let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3631
3632 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3633 AppError::Io(std::io::Error::new(
3634 e.kind(),
3635 format!("failed to spawn codex: {e}"),
3636 ))
3637 })?;
3638
3639 let full_prompt = format!("{prompt}\n\n{input_text}");
3640 let stdin_bytes = full_prompt.into_bytes();
3641 let mut child_stdin = child
3642 .stdin
3643 .take()
3644 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3645 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3646 child_stdin.write_all(&stdin_bytes)?;
3647 drop(child_stdin);
3648 Ok(())
3649 });
3650
3651 let start = std::time::Instant::now();
3652 let timeout = std::time::Duration::from_secs(timeout_secs);
3653 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3654 let _ = std::fs::remove_file(&schema_file);
3655
3656 match status {
3657 Some(exit_status) => {
3658 stdin_thread
3659 .join()
3660 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3661 .map_err(AppError::Io)?;
3662
3663 tracing::debug!(
3664 target: "process",
3665 exit_code = ?exit_status.code(),
3666 elapsed_ms = start.elapsed().as_millis() as u64,
3667 "external process completed"
3668 );
3669
3670 let mut stdout_buf = Vec::new();
3671 if let Some(mut out) = child.stdout.take() {
3672 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3673 }
3674 if !exit_status.success() {
3675 let mut stderr_buf = Vec::new();
3676 if let Some(mut err) = child.stderr.take() {
3677 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3678 }
3679 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3680 tracing::warn!(
3681 target: "enrich",
3682 exit_code = ?exit_status.code(),
3683 stderr = %stderr_str.trim(),
3684 "codex process failed"
3685 );
3686 return Err(AppError::Validation(format!(
3687 "codex exited with code {:?}: {}",
3688 exit_status.code(),
3689 stderr_str.trim()
3690 )));
3691 }
3692 let stdout_str = String::from_utf8(stdout_buf)
3693 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3694 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3697 let value: serde_json::Value =
3703 serde_json::from_str(&result.last_agent_text).map_err(|e| {
3704 AppError::Validation(format!(
3705 "codex agent_message is not valid JSON: {e}; raw={}",
3706 result.last_agent_text
3707 ))
3708 })?;
3709 Ok((value, 0.0, false))
3710 }
3711 None => {
3712 let _ = child.kill();
3713 let _ = child.wait();
3714 let _ = stdin_thread.join();
3715 Err(AppError::Validation(format!(
3716 "codex timed out after {timeout_secs} seconds"
3717 )))
3718 }
3719 }
3720}
3721
3722#[cfg(test)]
3727mod tests {
3728 use super::*;
3729 use rusqlite::Connection;
3730 #[cfg(unix)]
3731 use std::os::unix::fs::PermissionsExt;
3732
3733 fn open_test_db() -> Connection {
3735 let conn = Connection::open_in_memory().expect("in-memory db");
3736 conn.execute_batch(
3737 "CREATE TABLE memories (
3738 id INTEGER PRIMARY KEY AUTOINCREMENT,
3739 namespace TEXT NOT NULL DEFAULT 'global',
3740 name TEXT NOT NULL,
3741 type TEXT NOT NULL DEFAULT 'note',
3742 description TEXT NOT NULL DEFAULT '',
3743 body TEXT NOT NULL DEFAULT '',
3744 body_hash TEXT NOT NULL DEFAULT '',
3745 session_id TEXT,
3746 source TEXT NOT NULL DEFAULT 'agent',
3747 metadata TEXT NOT NULL DEFAULT '{}',
3748 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3749 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3750 deleted_at INTEGER,
3751 UNIQUE(namespace, name)
3752 );
3753 CREATE TABLE entities (
3754 id INTEGER PRIMARY KEY AUTOINCREMENT,
3755 namespace TEXT NOT NULL DEFAULT 'global',
3756 name TEXT NOT NULL,
3757 type TEXT NOT NULL DEFAULT 'concept',
3758 description TEXT,
3759 degree INTEGER NOT NULL DEFAULT 0,
3760 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3761 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3762 UNIQUE(namespace, name)
3763 );
3764 CREATE TABLE memory_entities (
3765 memory_id INTEGER NOT NULL,
3766 entity_id INTEGER NOT NULL,
3767 PRIMARY KEY (memory_id, entity_id)
3768 );
3769 CREATE TABLE relationships (
3770 id INTEGER PRIMARY KEY AUTOINCREMENT,
3771 namespace TEXT NOT NULL DEFAULT 'global',
3772 source_id INTEGER NOT NULL,
3773 target_id INTEGER NOT NULL,
3774 relation TEXT NOT NULL,
3775 weight REAL NOT NULL DEFAULT 0.5,
3776 description TEXT,
3777 UNIQUE(source_id, target_id, relation)
3778 );
3779 CREATE TABLE memory_embeddings (
3780 memory_id INTEGER PRIMARY KEY,
3781 namespace TEXT NOT NULL,
3782 embedding BLOB NOT NULL,
3783 source TEXT NOT NULL,
3784 model TEXT NOT NULL DEFAULT '',
3785 dim INTEGER NOT NULL DEFAULT 384,
3786 created_at INTEGER NOT NULL DEFAULT (unixepoch())
3787 );",
3788 )
3789 .expect("schema creation must succeed");
3790 conn
3791 }
3792
3793 #[test]
3794 fn scan_unbound_memories_finds_memories_without_bindings() {
3795 let conn = open_test_db();
3796 conn.execute(
3797 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3798 [],
3799 )
3800 .unwrap();
3801
3802 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3803 assert_eq!(results.len(), 1);
3804 assert_eq!(results[0].1, "test-mem");
3805 }
3806
3807 #[test]
3808 fn scan_unbound_memories_excludes_bound_memories() {
3809 let conn = open_test_db();
3810 conn.execute(
3811 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3812 [],
3813 )
3814 .unwrap();
3815 let mem_id: i64 = conn
3816 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3817 r.get(0)
3818 })
3819 .unwrap();
3820 conn.execute(
3821 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3822 [],
3823 )
3824 .unwrap();
3825 let ent_id: i64 = conn
3826 .query_row(
3827 "SELECT id FROM entities WHERE name='some-entity'",
3828 [],
3829 |r| r.get(0),
3830 )
3831 .unwrap();
3832 conn.execute(
3833 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3834 rusqlite::params![mem_id, ent_id],
3835 )
3836 .unwrap();
3837
3838 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3839 assert!(results.is_empty(), "bound memory must not appear in scan");
3840 }
3841
3842 #[test]
3843 fn scan_entities_without_description_finds_null_description() {
3844 let conn = open_test_db();
3845 conn.execute(
3846 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3847 [],
3848 )
3849 .unwrap();
3850
3851 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3852 assert_eq!(results.len(), 1);
3853 assert_eq!(results[0].1, "my-tool");
3854 }
3855
3856 #[test]
3857 fn scan_entities_without_description_excludes_entities_with_description() {
3858 let conn = open_test_db();
3859 conn.execute(
3860 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3861 [],
3862 )
3863 .unwrap();
3864
3865 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3866 assert!(
3867 results.is_empty(),
3868 "entity with description must not appear"
3869 );
3870 }
3871
3872 #[test]
3873 fn scan_short_body_memories_finds_short_bodies() {
3874 let conn = open_test_db();
3875 conn.execute(
3876 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3877 [],
3878 )
3879 .unwrap();
3880
3881 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3882 assert_eq!(results.len(), 1);
3883 assert_eq!(results[0].1, "short-mem");
3884 }
3885
3886 #[test]
3887 fn scan_short_body_memories_excludes_long_bodies() {
3888 let conn = open_test_db();
3889 let long_body = "a".repeat(1000);
3890 conn.execute(
3891 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3892 rusqlite::params![long_body],
3893 )
3894 .unwrap();
3895
3896 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3897 assert!(results.is_empty(), "long memory must not appear in scan");
3898 }
3899
3900 #[test]
3901 fn scan_respects_limit() {
3902 let conn = open_test_db();
3903 for i in 0..5 {
3904 conn.execute(
3905 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3906 [],
3907 )
3908 .unwrap();
3909 }
3910
3911 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3912 assert_eq!(results.len(), 3, "limit must be respected");
3913 }
3914
3915 #[test]
3916 fn scan_memories_without_embeddings_finds_only_missing_rows() {
3917 let conn = open_test_db();
3918 conn.execute(
3919 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3920 [],
3921 )
3922 .unwrap();
3923 conn.execute(
3924 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3925 [],
3926 )
3927 .unwrap();
3928 let memory_id: i64 = conn
3929 .query_row(
3930 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3931 [],
3932 |r| r.get(0),
3933 )
3934 .unwrap();
3935 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3936 memories::upsert_vec(
3937 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3938 )
3939 .unwrap();
3940
3941 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3942 assert_eq!(results.len(), 1);
3943 assert_eq!(results[0].1, "missing-vec");
3944 }
3945
3946 #[test]
3947 fn scan_memories_without_embeddings_respects_name_filter() {
3948 let conn = open_test_db();
3949 conn.execute(
3950 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3951 [],
3952 )
3953 .unwrap();
3954 conn.execute(
3955 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3956 [],
3957 )
3958 .unwrap();
3959
3960 let results =
3961 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3962 .unwrap();
3963 assert_eq!(results.len(), 1);
3964 assert_eq!(results[0].1, "match-me");
3965 }
3966
3967 #[test]
3968 fn queue_db_schema_creates_correctly() {
3969 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3970 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3971 let count: i64 = conn
3972 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3973 .unwrap();
3974 assert_eq!(count, 0);
3975 let _ = std::fs::remove_file(&tmp_path);
3976 }
3977
3978 #[test]
3979 fn parse_claude_output_valid_bindings() {
3980 let output = r#"[
3981 {"type":"system","subtype":"init"},
3982 {"type":"result","is_error":false,"total_cost_usd":0.01,
3983 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3984 ]"#;
3985 let result = crate::commands::claude_runner::parse_claude_output(output)
3986 .expect("must parse successfully");
3987 assert!(result.value.get("entities").is_some());
3988 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3989 assert!(!result.is_oauth);
3990 }
3991
3992 #[test]
3993 fn parse_claude_output_detects_oauth() {
3994 let output = r#"[
3995 {"type":"system","subtype":"init","apiKeySource":"none"},
3996 {"type":"result","is_error":false,"total_cost_usd":0.0,
3997 "structured_output":{"entities":[],"relationships":[]}}
3998 ]"#;
3999 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4000 assert!(result.is_oauth);
4001 }
4002
4003 #[test]
4004 fn parse_claude_output_rate_limit_returns_error() {
4005 let output = r#"[
4006 {"type":"system","subtype":"init"},
4007 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4008 ]"#;
4009 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4010 assert!(matches!(err, AppError::RateLimited { .. }));
4011 }
4012
4013 #[test]
4014 fn parse_claude_output_auth_error() {
4015 let output = r#"[
4016 {"type":"system","subtype":"init"},
4017 {"type":"result","is_error":true,"error":"authentication failed"}
4018 ]"#;
4019 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4020 assert!(format!("{err}").contains("authentication failed"));
4021 }
4022
4023 #[cfg(unix)]
4024 #[test]
4025 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4026 let tmp = tempfile::tempdir().expect("tempdir");
4027 let binary = tmp.path().join("codex-mock");
4028 std::fs::write(
4029 &binary,
4030 r#"#!/usr/bin/env bash
4031set -euo pipefail
4032cat <<'JSONL'
4033{"type":"thread.started","thread_id":"mock-thread-0"}
4034{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4035{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4036JSONL
4037"#,
4038 )
4039 .expect("mock codex write");
4040 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4041 perms.set_mode(0o755);
4042 std::fs::set_permissions(&binary, perms).expect("chmod");
4043
4044 let (value, cost, is_oauth) =
4045 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4046 .expect("call_codex must accept body-enrich payload");
4047
4048 assert_eq!(value["enriched_body"], "expanded body");
4049 assert_eq!(cost, 0.0);
4050 assert!(!is_oauth);
4051 }
4052
4053 #[test]
4054 fn dry_run_emits_preview_without_calling_llm() {
4055 let conn = open_test_db();
4060 conn.execute(
4061 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4062 [],
4063 )
4064 .unwrap();
4065
4066 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4067 assert_eq!(results.len(), 1);
4068 assert_eq!(results[0].1, "dry-mem");
4069 }
4072
4073 #[test]
4074 fn persist_entity_description_updates_db() {
4075 let conn = open_test_db();
4076 conn.execute(
4077 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4078 [],
4079 )
4080 .unwrap();
4081 let eid: i64 = conn
4082 .query_row(
4083 "SELECT id FROM entities WHERE name='tokio-runtime'",
4084 [],
4085 |r| r.get(0),
4086 )
4087 .unwrap();
4088
4089 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4090
4091 let desc: String = conn
4092 .query_row(
4093 "SELECT description FROM entities WHERE id=?1",
4094 rusqlite::params![eid],
4095 |r| r.get(0),
4096 )
4097 .unwrap();
4098 assert_eq!(desc, "Async runtime for Rust applications");
4099 }
4100
4101 #[test]
4102 fn bindings_schema_is_valid_json() {
4103 let _: serde_json::Value =
4104 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4105 }
4106
4107 #[test]
4108 fn entity_description_schema_is_valid_json() {
4109 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4110 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4111 }
4112
4113 #[test]
4114 fn body_enrich_schema_is_valid_json() {
4115 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4116 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4117 }
4118}