1use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51const BINDINGS_SCHEMA: &str = r#"{
56 "type": "object",
57 "properties": {
58 "entities": {
59 "type": "array",
60 "items": {
61 "type": "object",
62 "properties": {
63 "name": { "type": "string" },
64 "entity_type": {
65 "type": "string",
66 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67 }
68 },
69 "required": ["name", "entity_type"],
70 "additionalProperties": false
71 }
72 },
73 "relationships": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "source": { "type": "string" },
79 "target": { "type": "string" },
80 "relation": {
81 "type": "string",
82 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83 },
84 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85 },
86 "required": ["source","target","relation","strength"],
87 "additionalProperties": false
88 }
89 }
90 },
91 "required": ["entities","relationships"],
92 "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96 "type": "object",
97 "properties": {
98 "description": { "type": "string" }
99 },
100 "required": ["description"],
101 "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105 "type": "object",
106 "properties": {
107 "enriched_body": { "type": "string" }
108 },
109 "required": ["enriched_body"],
110 "additionalProperties": false
111}"#;
112
113const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126 "reasoning": { "type": "string" }
127 },
128 "required": ["calibrated_weight", "reasoning"],
129 "additionalProperties": false
130}"#;
131
132const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149 "type": "object",
150 "properties": {
151 "relation": { "type": "string" },
152 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153 "reasoning": { "type": "string" }
154 },
155 "required": ["relation", "strength", "reasoning"],
156 "additionalProperties": false
157}"#;
158
159const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166 "type": "object",
167 "properties": {
168 "relation": { "type": "string" },
169 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170 "reasoning": { "type": "string" }
171 },
172 "required": ["relation", "strength", "reasoning"],
173 "additionalProperties": false
174}"#;
175
176const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183 "type": "object",
184 "properties": {
185 "validated_type": { "type": "string" },
186 "was_correct": { "type": "boolean" },
187 "reasoning": { "type": "string" }
188 },
189 "required": ["validated_type", "was_correct", "reasoning"],
190 "additionalProperties": false
191}"#;
192
193const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200 "type": "object",
201 "properties": {
202 "description": { "type": "string" },
203 "reasoning": { "type": "string" }
204 },
205 "required": ["description", "reasoning"],
206 "additionalProperties": false
207}"#;
208
209const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214 "type": "object",
215 "properties": {
216 "domain": { "type": "string" },
217 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218 "reasoning": { "type": "string" }
219 },
220 "required": ["domain", "confidence", "reasoning"],
221 "additionalProperties": false
222}"#;
223
224const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230 "type": "object",
231 "properties": {
232 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234 "reasoning": { "type": "string" }
235 },
236 "required": ["quality_score", "issues", "reasoning"],
237 "additionalProperties": false
238}"#;
239
240const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247 "type": "object",
248 "properties": {
249 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251 "summary": { "type": "string" }
252 },
253 "required": ["entities", "relationships", "summary"],
254 "additionalProperties": false
255}"#;
256
257const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263 "type": "object",
264 "properties": {
265 "restructured_body": { "type": "string" },
266 "changes_summary": { "type": "string" }
267 },
268 "required": ["restructured_body", "changes_summary"],
269 "additionalProperties": false
270}"#;
271
272const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299 MemoryBindings,
301 EntityDescriptions,
303 BodyEnrich,
305 ReEmbed,
307 WeightCalibrate,
309 RelationReclassify,
311 EntityConnect,
313 EntityTypeValidate,
315 DescriptionEnrich,
317 CrossDomainBridges,
319 DomainClassify,
321 GraphAudit,
323 DeepResearchSynth,
325 BodyExtract,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332 ClaudeCode,
334 Codex,
336}
337
338impl std::fmt::Display for EnrichMode {
339 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340 match self {
341 EnrichMode::ClaudeCode => write!(f, "claude-code"),
342 EnrichMode::Codex => write!(f, "codex"),
343 }
344 }
345}
346
347#[derive(clap::Args)]
349#[command(
350 about = "Enrich graph memories and entities using an LLM provider",
351 after_long_help = "EXAMPLES:\n \
352 # Add missing entity bindings to all unbound memories\n \
353 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
354 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
355 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
356 # Expand short memory bodies (GAP-18)\n \
357 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
358 # Rebuild only missing memory embeddings without rewriting bodies\n \
359 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
360 # Resume an interrupted body-enrich run\n \
361 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
362 # Retry only failed items from a previous run\n \
363 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
364 EXIT CODES:\n \
365 0 success\n \
366 1 validation error (bad args, binary not found)\n \
367 14 I/O error"
368)]
369pub struct EnrichArgs {
370 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
372 pub operation: EnrichOperation,
373
374 #[arg(long, value_enum, default_value = "claude-code")]
376 pub mode: EnrichMode,
377
378 #[arg(long, value_name = "N")]
380 pub limit: Option<usize>,
381
382 #[arg(long)]
384 pub dry_run: bool,
385
386 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
388 pub namespace: Option<String>,
389
390 #[arg(long, value_name = "PATH")]
393 pub claude_binary: Option<PathBuf>,
394
395 #[arg(long, value_name = "MODEL")]
397 pub claude_model: Option<String>,
398
399 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
401 pub claude_timeout: u64,
402
403 #[arg(long, value_name = "PATH")]
406 pub codex_binary: Option<PathBuf>,
407
408 #[arg(long, value_name = "MODEL")]
410 pub codex_model: Option<String>,
411
412 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
414 pub codex_timeout: u64,
415
416 #[arg(long, value_name = "USD")]
419 pub max_cost_usd: Option<f64>,
420
421 #[arg(long)]
424 pub resume: bool,
425
426 #[arg(long)]
428 pub retry_failed: bool,
429
430 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
433 pub min_output_chars: usize,
434
435 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
437 pub max_output_chars: usize,
438
439 #[arg(long, default_value_t = true)]
441 pub preserve_check: bool,
442
443 #[arg(long, value_name = "PATH")]
445 pub prompt_template: Option<PathBuf>,
446
447 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
451 pub llm_parallelism: u32,
452
453 #[arg(long)]
456 pub json: bool,
457
458 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
460 pub db: Option<String>,
461
462 #[arg(long, value_name = "SECONDS")]
465 pub wait_job_singleton: Option<u64>,
466
467 #[arg(long, default_value_t = false)]
471 pub force_job_singleton: bool,
472
473 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
477 pub names: Vec<String>,
478
479 #[arg(long, value_name = "PATH")]
483 pub names_file: Option<PathBuf>,
484
485 #[arg(long, default_value_t = false)]
489 pub preflight_check: bool,
490
491 #[arg(long, value_enum)]
495 pub fallback_mode: Option<EnrichMode>,
496
497 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
500 pub rate_limit_buffer: u64,
501
502 #[arg(long, default_value_t = true)]
506 pub max_load_check: bool,
507
508 #[arg(long, value_name = "N", default_value_t = 5)]
511 pub circuit_breaker_threshold: u32,
512
513 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
520 pub preserve_threshold: f64,
521
522 #[arg(long, default_value_t = true)]
527 pub codex_model_validate: bool,
528
529 #[arg(long, value_name = "MODEL")]
534 pub codex_model_fallback: Option<String>,
535}
536
537#[derive(Debug, Serialize)]
546struct PhaseEvent<'a> {
547 phase: &'a str,
548 #[serde(skip_serializing_if = "Option::is_none")]
549 binary_path: Option<&'a str>,
550 #[serde(skip_serializing_if = "Option::is_none")]
551 version: Option<&'a str>,
552 #[serde(skip_serializing_if = "Option::is_none")]
553 items_total: Option<usize>,
554 #[serde(skip_serializing_if = "Option::is_none")]
555 items_pending: Option<usize>,
556 #[serde(skip_serializing_if = "Option::is_none")]
558 llm_parallelism: Option<u32>,
559}
560
561#[derive(Debug, Serialize)]
562struct ItemEvent<'a> {
563 item: &'a str,
565 status: &'a str,
566 #[serde(skip_serializing_if = "Option::is_none")]
567 memory_id: Option<i64>,
568 #[serde(skip_serializing_if = "Option::is_none")]
569 entity_id: Option<i64>,
570 #[serde(skip_serializing_if = "Option::is_none")]
571 entities: Option<usize>,
572 #[serde(skip_serializing_if = "Option::is_none")]
573 rels: Option<usize>,
574 #[serde(skip_serializing_if = "Option::is_none")]
575 chars_before: Option<usize>,
576 #[serde(skip_serializing_if = "Option::is_none")]
577 chars_after: Option<usize>,
578 #[serde(skip_serializing_if = "Option::is_none")]
579 cost_usd: Option<f64>,
580 #[serde(skip_serializing_if = "Option::is_none")]
581 elapsed_ms: Option<u64>,
582 #[serde(skip_serializing_if = "Option::is_none")]
583 error: Option<String>,
584 index: usize,
585 total: usize,
586}
587
588#[derive(Debug, Serialize)]
589struct EnrichSummary {
590 summary: bool,
591 operation: String,
592 items_total: usize,
593 completed: usize,
594 failed: usize,
595 skipped: usize,
596 cost_usd: f64,
597 elapsed_ms: u64,
598 #[serde(skip_serializing_if = "Option::is_none")]
603 backend_invoked: Option<&'static str>,
604}
605
606use crate::output::emit_json_line as emit_json;
607
608fn open_queue_db(path: &str) -> Result<Connection, AppError> {
623 let conn = Connection::open(path)?;
624 conn.pragma_update(None, "journal_mode", "wal")?;
625 conn.execute_batch(
626 "CREATE TABLE IF NOT EXISTS queue (
627 id INTEGER PRIMARY KEY AUTOINCREMENT,
628 item_key TEXT NOT NULL UNIQUE,
629 item_type TEXT NOT NULL DEFAULT 'memory',
630 status TEXT NOT NULL DEFAULT 'pending',
631 memory_id INTEGER,
632 entity_id INTEGER,
633 entities INTEGER DEFAULT 0,
634 rels INTEGER DEFAULT 0,
635 error TEXT,
636 cost_usd REAL DEFAULT 0.0,
637 attempt INTEGER DEFAULT 0,
638 elapsed_ms INTEGER,
639 created_at TEXT DEFAULT (datetime('now')),
640 done_at TEXT
641 );
642 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
643 )?;
644 Ok(conn)
645}
646
647fn call_claude(
655 binary: &Path,
656 prompt: &str,
657 json_schema: &str,
658 input_text: &str,
659 model: Option<&str>,
660 timeout_secs: u64,
661) -> Result<(serde_json::Value, f64, bool), AppError> {
662 let result = crate::commands::claude_runner::run_claude(
663 binary,
664 prompt,
665 json_schema,
666 input_text,
667 model,
668 timeout_secs,
669 7,
670 )?;
671 Ok((result.value, result.cost_usd, result.is_oauth))
672}
673
674enum PreflightOutcome {
680 Healthy,
682 RateLimited {
686 reason: String,
687 suggestion: &'static str,
688 },
689 Error(AppError),
691}
692
693fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
701 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
702
703 match args.mode {
704 EnrichMode::ClaudeCode => {
705 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
706 Ok(b) => b,
707 Err(e) => return PreflightOutcome::Error(e),
708 };
709 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
714 Ok(p) => p,
715 Err(e) => {
716 return PreflightOutcome::Error(AppError::Io(e));
717 }
718 };
719 let mut cmd = std::process::Command::new(&bin);
720 cmd.env_clear();
721 for var in &["PATH", "HOME", "USER"] {
722 if let Ok(val) = std::env::var(var) {
723 cmd.env(var, val);
724 }
725 }
726 cmd.arg("-p")
727 .arg("ping")
728 .arg("--max-turns")
729 .arg("1")
730 .arg("--strict-mcp-config")
731 .arg("--mcp-config")
732 .arg(mcp_config_path.as_os_str())
733 .arg("--dangerously-skip-permissions")
734 .arg("--settings")
735 .arg("{\"hooks\":{}}")
736 .arg("--output-format")
737 .arg("json")
738 .stdin(std::process::Stdio::null())
739 .stdout(std::process::Stdio::piped())
740 .stderr(std::process::Stdio::piped());
741
742 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
743 Ok(c) => c,
744 Err(e) => {
745 return PreflightOutcome::Error(AppError::Io(e));
746 }
747 };
748 let output = match wait_with_timeout(child, timeout) {
749 Ok(out) => out,
750 Err(e) => return PreflightOutcome::Error(e),
751 };
752 if !output.status.success() {
753 let stderr = String::from_utf8_lossy(&output.stderr);
754 if stderr.contains("hit your session limit")
755 || stderr.contains("rate_limit")
756 || stderr.contains("429")
757 {
758 return PreflightOutcome::RateLimited {
759 reason: stderr.trim().to_string(),
760 suggestion:
761 "wait for the OAuth window to reset or use --fallback-mode codex",
762 };
763 }
764 return PreflightOutcome::Error(AppError::Validation(format!(
765 "preflight probe failed: {stderr}",
766 stderr = stderr.trim()
767 )));
768 }
769 PreflightOutcome::Healthy
770 }
771 EnrichMode::Codex => {
772 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
773 Ok(b) => b,
774 Err(e) => return PreflightOutcome::Error(e),
775 };
776 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
777 .map_err(PreflightOutcome::Error)
778 .ok();
779 let schema = "{}";
780 let schema_path = match super::codex_spawn::trusted_schema_path() {
781 Ok(p) => p,
782 Err(e) => return PreflightOutcome::Error(e),
783 };
784 let spawn_args = super::codex_spawn::CodexSpawnArgs {
785 binary: &bin,
786 prompt: "ping",
787 json_schema: schema,
788 input_text: "",
789 model: args.codex_model.as_deref(),
790 timeout_secs: args.rate_limit_buffer.max(60),
791 schema_path: schema_path.clone(),
792 };
793 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
794 Ok(c) => c,
795 Err(e) => return PreflightOutcome::Error(e),
796 };
797 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
798 Ok(c) => c,
799 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
800 };
801 let output = match wait_with_timeout(child, timeout) {
802 Ok(out) => out,
803 Err(e) => return PreflightOutcome::Error(e),
804 };
805 let _ = std::fs::remove_file(&schema_path);
806 if !output.status.success() {
807 let stderr = String::from_utf8_lossy(&output.stderr);
808 if stderr.contains("rate_limit")
809 || stderr.contains("429")
810 || stderr.contains("Too Many Requests")
811 {
812 return PreflightOutcome::RateLimited {
813 reason: stderr.trim().to_string(),
814 suggestion: "wait for the rate-limit window to reset",
815 };
816 }
817 return PreflightOutcome::Error(AppError::Validation(format!(
818 "preflight probe failed: {stderr}",
819 stderr = stderr.trim()
820 )));
821 }
822 PreflightOutcome::Healthy
823 }
824 }
825}
826
827fn wait_with_timeout(
829 mut child: std::process::Child,
830 timeout: std::time::Duration,
831) -> Result<std::process::Output, AppError> {
832 use wait_timeout::ChildExt;
833 let start = std::time::Instant::now();
834 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
835 if status.is_none() {
836 let _ = child.kill();
837 let _ = child.wait();
838 return Err(AppError::Validation(format!(
839 "preflight probe timed out after {}s",
840 start.elapsed().as_secs()
841 )));
842 }
843 let mut stdout = Vec::new();
844 if let Some(mut out) = child.stdout.take() {
845 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
846 }
847 let mut stderr = Vec::new();
848 if let Some(mut err) = child.stderr.take() {
849 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
850 }
851 let exit = status.unwrap();
852 Ok(std::process::Output {
853 status: exit,
854 stdout,
855 stderr,
856 })
857}
858
859fn scan_unbound_memories(
870 conn: &Connection,
871 namespace: &str,
872 limit: Option<usize>,
873 name_filter: &[String],
874) -> Result<Vec<(i64, String, String)>, AppError> {
875 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
876
877 if name_filter.is_empty() {
878 let sql = format!(
879 "SELECT m.id, m.name, m.body
880 FROM memories m
881 WHERE m.namespace = ?1
882 AND m.deleted_at IS NULL
883 AND NOT EXISTS (
884 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
885 )
886 ORDER BY m.id
887 {limit_clause}"
888 );
889 let mut stmt = conn.prepare(&sql)?;
890 let rows = stmt
891 .query_map(rusqlite::params![namespace], |r| {
892 Ok((
893 r.get::<_, i64>(0)?,
894 r.get::<_, String>(1)?,
895 r.get::<_, String>(2)?,
896 ))
897 })?
898 .collect::<Result<Vec<_>, _>>()?;
899 Ok(rows)
900 } else {
901 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
903 .map(|i| format!("?{i}"))
904 .collect();
905 let in_clause = placeholders.join(", ");
906 let sql = format!(
907 "SELECT m.id, m.name, m.body
908 FROM memories m
909 WHERE m.namespace = ?1
910 AND m.deleted_at IS NULL
911 AND m.name IN ({in_clause})
912 AND NOT EXISTS (
913 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
914 )
915 ORDER BY m.id
916 {limit_clause}"
917 );
918 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
919 params_vec.push(&namespace);
920 for n in name_filter {
921 params_vec.push(n);
922 }
923 let mut stmt = conn.prepare(&sql)?;
924 let rows = stmt
925 .query_map(
926 rusqlite::params_from_iter(params_vec.iter().copied()),
927 |r| {
928 Ok((
929 r.get::<_, i64>(0)?,
930 r.get::<_, String>(1)?,
931 r.get::<_, String>(2)?,
932 ))
933 },
934 )?
935 .collect::<Result<Vec<_>, _>>()?;
936 Ok(rows)
937 }
938}
939
940fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
945 let content = std::fs::read_to_string(path).map_err(|e| {
946 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
947 })?;
948 let mut seen = std::collections::HashSet::new();
949 let mut out = Vec::new();
950 for line in content.lines() {
951 let trimmed = line.trim();
952 if trimmed.is_empty() || trimmed.starts_with('#') {
953 continue;
954 }
955 if seen.insert(trimmed.to_string()) {
956 out.push(trimmed.to_string());
957 }
958 }
959 Ok(out)
960}
961
962fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
964 let mut combined: Vec<String> = args.names.clone();
965 if let Some(p) = &args.names_file {
966 let from_file = read_names_file(p)?;
967 for n in from_file {
968 if !combined.contains(&n) {
969 combined.push(n);
970 }
971 }
972 }
973 Ok(combined)
974}
975
976fn scan_entities_without_description(
980 conn: &Connection,
981 namespace: &str,
982 limit: Option<usize>,
983) -> Result<Vec<(i64, String, String)>, AppError> {
984 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
985 let sql = format!(
986 "SELECT id, name, type
987 FROM entities
988 WHERE namespace = ?1
989 AND (description IS NULL OR description = '')
990 ORDER BY id
991 {limit_clause}"
992 );
993 let mut stmt = conn.prepare(&sql)?;
994 let rows = stmt
995 .query_map(rusqlite::params![namespace], |r| {
996 Ok((
997 r.get::<_, i64>(0)?,
998 r.get::<_, String>(1)?,
999 r.get::<_, String>(2)?,
1000 ))
1001 })?
1002 .collect::<Result<Vec<_>, _>>()?;
1003 Ok(rows)
1004}
1005
1006fn scan_short_body_memories(
1010 conn: &Connection,
1011 namespace: &str,
1012 min_chars: usize,
1013 limit: Option<usize>,
1014) -> Result<Vec<(i64, String, String)>, AppError> {
1015 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1016 let sql = format!(
1017 "SELECT m.id, m.name, m.body
1018 FROM memories m
1019 WHERE m.namespace = ?1
1020 AND m.deleted_at IS NULL
1021 AND LENGTH(COALESCE(m.body,'')) < ?2
1022 ORDER BY m.id
1023 {limit_clause}"
1024 );
1025 let mut stmt = conn.prepare(&sql)?;
1026 let rows = stmt
1027 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1028 Ok((
1029 r.get::<_, i64>(0)?,
1030 r.get::<_, String>(1)?,
1031 r.get::<_, String>(2)?,
1032 ))
1033 })?
1034 .collect::<Result<Vec<_>, _>>()?;
1035 Ok(rows)
1036}
1037
1038fn scan_memories_without_embeddings(
1042 conn: &Connection,
1043 namespace: &str,
1044 limit: Option<usize>,
1045 name_filter: &[String],
1046) -> Result<Vec<(i64, String, String)>, AppError> {
1047 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1048
1049 if name_filter.is_empty() {
1050 let sql = format!(
1051 "SELECT m.id, m.name, COALESCE(m.body,'')
1052 FROM memories m
1053 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1054 WHERE m.namespace = ?1
1055 AND m.deleted_at IS NULL
1056 AND me.memory_id IS NULL
1057 ORDER BY m.id
1058 {limit_clause}"
1059 );
1060 let mut stmt = conn.prepare(&sql)?;
1061 let rows = stmt
1062 .query_map(rusqlite::params![namespace], |r| {
1063 Ok((
1064 r.get::<_, i64>(0)?,
1065 r.get::<_, String>(1)?,
1066 r.get::<_, String>(2)?,
1067 ))
1068 })?
1069 .collect::<Result<Vec<_>, _>>()?;
1070 Ok(rows)
1071 } else {
1072 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1073 .map(|i| format!("?{i}"))
1074 .collect();
1075 let in_clause = placeholders.join(", ");
1076 let sql = format!(
1077 "SELECT m.id, m.name, COALESCE(m.body,'')
1078 FROM memories m
1079 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1080 WHERE m.namespace = ?1
1081 AND m.deleted_at IS NULL
1082 AND m.name IN ({in_clause})
1083 AND me.memory_id IS NULL
1084 ORDER BY m.id
1085 {limit_clause}"
1086 );
1087 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1088 params_vec.push(&namespace);
1089 for n in name_filter {
1090 params_vec.push(n);
1091 }
1092 let mut stmt = conn.prepare(&sql)?;
1093 let rows = stmt
1094 .query_map(
1095 rusqlite::params_from_iter(params_vec.iter().copied()),
1096 |r| {
1097 Ok((
1098 r.get::<_, i64>(0)?,
1099 r.get::<_, String>(1)?,
1100 r.get::<_, String>(2)?,
1101 ))
1102 },
1103 )?
1104 .collect::<Result<Vec<_>, _>>()?;
1105 Ok(rows)
1106 }
1107}
1108
1109#[allow(clippy::type_complexity)]
1111fn scan_weight_candidates(
1112 conn: &Connection,
1113 namespace: &str,
1114 limit: Option<usize>,
1115) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1116 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1117 let sql = format!(
1118 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1119 FROM relationships r \
1120 JOIN entities e1 ON e1.id = r.source_id \
1121 JOIN entities e2 ON e2.id = r.target_id \
1122 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1123 ORDER BY r.weight DESC {limit_clause}"
1124 );
1125 let mut stmt = conn.prepare(&sql)?;
1126 let rows = stmt
1127 .query_map(rusqlite::params![namespace], |r| {
1128 Ok((
1129 r.get::<_, i64>(0)?,
1130 r.get::<_, String>(1)?,
1131 r.get::<_, String>(2)?,
1132 r.get::<_, String>(3)?,
1133 r.get::<_, f64>(4)?,
1134 ))
1135 })?
1136 .collect::<Result<Vec<_>, _>>()?;
1137 Ok(rows)
1138}
1139
1140fn scan_generic_relations(
1142 conn: &Connection,
1143 namespace: &str,
1144 limit: Option<usize>,
1145) -> Result<Vec<(i64, String, String, String)>, AppError> {
1146 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1147 let sql = format!(
1148 "SELECT r.id, e1.name, e2.name, r.relation \
1149 FROM relationships r \
1150 JOIN entities e1 ON e1.id = r.source_id \
1151 JOIN entities e2 ON e2.id = r.target_id \
1152 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1153 ORDER BY r.id {limit_clause}"
1154 );
1155 let mut stmt = conn.prepare(&sql)?;
1156 let rows = stmt
1157 .query_map(rusqlite::params![namespace], |r| {
1158 Ok((
1159 r.get::<_, i64>(0)?,
1160 r.get::<_, String>(1)?,
1161 r.get::<_, String>(2)?,
1162 r.get::<_, String>(3)?,
1163 ))
1164 })?
1165 .collect::<Result<Vec<_>, _>>()?;
1166 Ok(rows)
1167}
1168
1169fn persist_memory_bindings(
1178 conn: &Connection,
1179 namespace: &str,
1180 memory_id: i64,
1181 entities_json: &serde_json::Value,
1182 rels_json: &serde_json::Value,
1183) -> Result<(usize, usize), AppError> {
1184 #[derive(Deserialize)]
1185 struct EntityItem {
1186 name: String,
1187 entity_type: String,
1188 }
1189 #[derive(Deserialize)]
1190 struct RelItem {
1191 source: String,
1192 target: String,
1193 relation: String,
1194 strength: f64,
1195 }
1196
1197 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1198 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1199
1200 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1201 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1202
1203 let mut ent_count = 0usize;
1204 let mut rel_count = 0usize;
1205
1206 for item in &extracted_entities {
1207 let entity_type = match item.entity_type.parse::<EntityType>() {
1208 Ok(et) => et,
1209 Err(_) => {
1210 tracing::warn!(
1211 target: "enrich",
1212 entity = %item.name,
1213 entity_type = %item.entity_type,
1214 "entity type not recognized, skipping"
1215 );
1216 continue;
1217 }
1218 };
1219 match entities::upsert_entity(
1220 conn,
1221 namespace,
1222 &NewEntity {
1223 name: item.name.clone(),
1224 entity_type,
1225 description: None,
1226 },
1227 ) {
1228 Ok(eid) => {
1229 let _ = entities::link_memory_entity(conn, memory_id, eid);
1230 ent_count += 1;
1231 }
1232 Err(e) => {
1233 tracing::warn!(
1234 target: "enrich",
1235 entity = %item.name,
1236 error = %e,
1237 "entity upsert skipped"
1238 );
1239 }
1240 }
1241 }
1242
1243 for rel in &extracted_rels {
1244 let normalized = crate::parsers::normalize_relation(&rel.relation);
1245 crate::parsers::warn_if_non_canonical(&normalized);
1246
1247 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1250 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1251 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1252 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1253 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1254 let new_rel = NewRelationship {
1255 source: rel.source.clone(),
1256 target: rel.target.clone(),
1257 relation: normalized,
1258 strength: rel.strength,
1259 description: None,
1260 };
1261 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1262 rel_count += 1;
1263 }
1264 }
1265 }
1266
1267 Ok((ent_count, rel_count))
1268}
1269
1270fn persist_entity_description(
1272 conn: &Connection,
1273 entity_id: i64,
1274 description: &str,
1275) -> Result<(), AppError> {
1276 conn.execute(
1277 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1278 rusqlite::params![description, entity_id],
1279 )?;
1280 Ok(())
1281}
1282
1283#[allow(clippy::too_many_arguments)]
1289fn reembed_memory_vector(
1290 conn: &Connection,
1291 namespace: &str,
1292 memory_id: i64,
1293 memory_name: &str,
1294 memory_type: &str,
1295 body: &str,
1296 paths: &crate::paths::AppPaths,
1297 llm_backend: crate::cli::LlmBackendChoice,
1298) -> Result<(), AppError> {
1299 let snippet: String = body.chars().take(200).collect();
1300 let (embedding, backend_kind) =
1305 crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1306 record_enrich_backend(backend_kind.as_str());
1307 memories::upsert_vec(
1308 conn,
1309 memory_id,
1310 namespace,
1311 memory_type,
1312 &embedding,
1313 memory_name,
1314 &snippet,
1315 )?;
1316 Ok(())
1317}
1318
1319fn record_enrich_backend(backend: &'static str) {
1325 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1326 *guard = Some(backend);
1327 }
1328}
1329
1330fn take_enrich_backend() -> Option<&'static str> {
1331 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1332}
1333
1334static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1335
1336fn persist_enriched_body(
1341 conn: &Connection,
1342 namespace: &str,
1343 memory_id: i64,
1344 memory_name: &str,
1345 new_body: &str,
1346 paths: &crate::paths::AppPaths,
1347 llm_backend: crate::cli::LlmBackendChoice,
1348) -> Result<(), AppError> {
1349 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1351 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1352 rusqlite::params![memory_id],
1353 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1354 )?;
1355
1356 let memory_type: String = conn.query_row(
1357 "SELECT type FROM memories WHERE id=?1",
1358 rusqlite::params![memory_id],
1359 |r| r.get(0),
1360 )?;
1361
1362 let description: String = conn.query_row(
1363 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1364 rusqlite::params![memory_id],
1365 |r| r.get(0),
1366 )?;
1367
1368 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1369
1370 let new_memory = memories::NewMemory {
1371 namespace: namespace.to_string(),
1372 name: memory_name.to_string(),
1373 memory_type: memory_type.clone(),
1374 description: description.clone(),
1375 body: new_body.to_string(),
1376 body_hash,
1377 session_id: None,
1378 source: "agent".to_string(),
1379 metadata: serde_json::json!({
1380 "operation": "body-enrich",
1381 "orig_chars": old_body.chars().count(),
1382 "new_chars": new_body.chars().count(),
1383 }),
1384 };
1385
1386 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1390 let version_metadata = serde_json::json!({
1391 "operation": "body-enrich",
1392 "orig_chars": old_body.chars().count(),
1393 "new_chars": new_body.chars().count(),
1394 })
1395 .to_string();
1396 crate::storage::versions::insert_version(
1397 conn,
1398 memory_id,
1399 next_version,
1400 memory_name,
1401 &memory_type,
1402 &description,
1403 new_body,
1404 &version_metadata,
1405 Some("enrich"),
1406 "edit",
1407 )?;
1408
1409 memories::update(conn, memory_id, &new_memory, None)?;
1410 memories::sync_fts_after_update(
1411 conn,
1412 memory_id,
1413 &old_name,
1414 &old_desc,
1415 &old_body,
1416 &new_memory.name,
1417 &new_memory.description,
1418 &new_memory.body,
1419 )?;
1420
1421 if let Err(e) = reembed_memory_vector(
1423 conn,
1424 namespace,
1425 memory_id,
1426 memory_name,
1427 &memory_type,
1428 new_body,
1429 paths,
1430 llm_backend,
1431 ) {
1432 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1433 }
1434
1435 Ok(())
1436}
1437
1438fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1450 value == default
1451}
1452
1453fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1468 const DEFAULT_TIMEOUT: u64 = 300;
1469
1470 let mut conflicts: Vec<String> = Vec::new();
1471
1472 match args.mode {
1473 EnrichMode::ClaudeCode => {
1474 if args.codex_binary.is_some() {
1475 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1476 }
1477 if args.codex_model.is_some() {
1478 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1479 }
1480 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1481 conflicts.push(format!(
1482 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1483 args.codex_timeout
1484 ));
1485 }
1486 }
1487 EnrichMode::Codex => {
1488 if args.claude_binary.is_some() {
1489 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1490 }
1491 if args.claude_model.is_some() {
1492 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1493 }
1494 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1495 conflicts.push(format!(
1496 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1497 args.claude_timeout
1498 ));
1499 }
1500 if args.max_cost_usd.is_some() {
1501 conflicts.push(
1502 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1503 .to_string(),
1504 );
1505 }
1506 }
1507 }
1508
1509 if !conflicts.is_empty() {
1510 return Err(AppError::Validation(format!(
1511 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1512 args.mode,
1513 conflicts.join("\n - ")
1514 )));
1515 }
1516
1517 Ok(())
1518}
1519
1520pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1524 validate_mode_conditional_flags_enrich(args)?;
1527 let started = Instant::now();
1528
1529 let paths = AppPaths::resolve(args.db.as_deref())?;
1530 ensure_db_ready(&paths)?;
1531 let conn = open_rw(&paths.db)?;
1532 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1533
1534 let wait_secs = args.wait_job_singleton;
1540 let force_flag = args.force_job_singleton;
1541 let _singleton = crate::lock::acquire_job_singleton(
1542 crate::lock::JobType::Enrich,
1543 &namespace,
1544 &paths.db,
1545 wait_secs,
1546 force_flag,
1547 )?;
1548
1549 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1551 None
1552 } else {
1553 Some(match args.mode {
1554 EnrichMode::ClaudeCode => {
1555 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1556 let version = super::claude_runner::validate_claude_version(&bin)?;
1557 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1558 emit_json(&PhaseEvent {
1559 phase: "validate",
1560 binary_path: bin.to_str(),
1561 version: Some(&version),
1562 items_total: None,
1563 items_pending: None,
1564 llm_parallelism: None,
1565 });
1566 bin
1567 }
1568 EnrichMode::Codex => {
1569 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1570 emit_json(&PhaseEvent {
1571 phase: "validate",
1572 binary_path: bin.to_str(),
1573 version: None,
1574 items_total: None,
1575 items_pending: None,
1576 llm_parallelism: None,
1577 });
1578 bin
1579 }
1580 })
1581 };
1582
1583 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1587 let load = crate::system_load::load_average_one();
1588 let n = crate::system_load::ncpus();
1589 return Err(AppError::Validation(format!(
1590 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1591 pass --no-max-load-check to override (not recommended)"
1592 )));
1593 }
1594
1595 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1602 {
1603 let preflight_result = run_preflight_probe(args);
1604 match preflight_result {
1605 PreflightOutcome::Healthy => {
1606 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1607 }
1608 PreflightOutcome::RateLimited { reason, suggestion } => {
1609 if let Some(fallback) = args.fallback_mode.clone() {
1610 if fallback != args.mode {
1611 return Err(AppError::Validation(format!(
1621 "preflight detected rate limit on {mode:?}: {reason}; \
1622 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1623 mode = args.mode
1624 )));
1625 }
1626 return Err(AppError::Validation(format!(
1627 "preflight detected rate limit on {mode:?}: {reason}; \
1628 --fallback-mode matches --mode, no recovery possible",
1629 mode = args.mode
1630 )));
1631 }
1632 return Err(AppError::Validation(format!(
1633 "preflight detected rate limit on {mode:?}: {reason}; \
1634 {suggestion}; pass --fallback-mode codex to recover",
1635 mode = args.mode
1636 )));
1637 }
1638 PreflightOutcome::Error(e) => {
1639 return Err(e);
1640 }
1641 }
1642 }
1643
1644 let scan_result = scan_operation(&conn, &namespace, args)?;
1646 let total = scan_result.len();
1647
1648 emit_json(&PhaseEvent {
1649 phase: "scan",
1650 binary_path: None,
1651 version: None,
1652 items_total: Some(total),
1653 items_pending: Some(total),
1654 llm_parallelism: Some(args.llm_parallelism),
1655 });
1656
1657 if args.dry_run {
1659 for (idx, key) in scan_result.iter().enumerate() {
1660 emit_json(&ItemEvent {
1661 item: key,
1662 status: "preview",
1663 memory_id: None,
1664 entity_id: None,
1665 entities: None,
1666 rels: None,
1667 chars_before: None,
1668 chars_after: None,
1669 cost_usd: None,
1670 elapsed_ms: None,
1671 error: None,
1672 index: idx,
1673 total,
1674 });
1675 }
1676 emit_json(&EnrichSummary {
1677 summary: true,
1678 operation: format!("{:?}", args.operation),
1679 items_total: total,
1680 completed: 0,
1681 failed: 0,
1682 skipped: 0,
1683 cost_usd: 0.0,
1684 elapsed_ms: started.elapsed().as_millis() as u64,
1685 backend_invoked: take_enrich_backend(),
1686 });
1687 return Ok(());
1688 }
1689
1690 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1694
1695 if args.resume {
1696 let reset = queue_conn
1697 .execute(
1698 "UPDATE queue SET status='pending' WHERE status='processing'",
1699 [],
1700 )
1701 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1702 if reset > 0 {
1703 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1704 }
1705 }
1706
1707 if args.retry_failed {
1708 let count = queue_conn
1709 .execute(
1710 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1711 [],
1712 )
1713 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1714 tracing::info!(target: "enrich", count, "retrying failed items");
1715 }
1716
1717 if !args.resume && !args.retry_failed {
1718 queue_conn
1719 .execute("DELETE FROM queue", [])
1720 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1721 }
1722
1723 for (idx, key) in scan_result.iter().enumerate() {
1725 let item_type = match args.operation {
1726 EnrichOperation::EntityDescriptions => "entity",
1727 _ => "memory",
1728 };
1729 if let Err(e) = queue_conn.execute(
1730 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1731 rusqlite::params![key, item_type],
1732 ) {
1733 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1734 }
1735 let _ = idx; }
1737
1738 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1741 if parallelism > 1 {
1742 tracing::info!(
1743 target: "enrich",
1744 llm_parallelism = parallelism,
1745 "parallel LLM processing with bounded thread pool"
1746 );
1747 }
1748 if parallelism > 4 {
1752 match args.mode {
1753 EnrichMode::ClaudeCode => {
1754 tracing::warn!(
1755 target: "enrich",
1756 llm_parallelism = parallelism,
1757 recommended_max = 4,
1758 mode = "claude-code",
1759 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1760 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1761 to cut MCP children (G28-A)"
1762 );
1763 }
1764 EnrichMode::Codex if parallelism > 16 => {
1765 tracing::warn!(
1766 target: "enrich",
1767 llm_parallelism = parallelism,
1768 recommended_max = 16,
1769 mode = "codex",
1770 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1771 consider --llm-parallelism 8 for safer concurrency"
1772 );
1773 }
1774 EnrichMode::Codex => {
1775 }
1779 }
1780 }
1781
1782 let mut completed = 0usize;
1783 let mut failed = 0usize;
1784 let mut skipped = 0usize;
1785 let mut cost_total = 0.0f64;
1786 let mut oauth_detected = false;
1787 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1788 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1789 let enrich_started = std::time::Instant::now();
1790
1791 let provider_timeout = match args.mode {
1792 EnrichMode::ClaudeCode => args.claude_timeout,
1793 EnrichMode::Codex => args.codex_timeout,
1794 };
1795
1796 let provider_model: Option<&str> = match args.mode {
1797 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1798 EnrichMode::Codex => args.codex_model.as_deref(),
1799 };
1800
1801 if parallelism > 1 {
1805 let stdout_mu = parking_lot::Mutex::new(());
1806 let budget = args.max_cost_usd;
1807 let operation = args.operation.clone();
1808 let mode = args.mode.clone();
1809 let min_oc = args.min_output_chars;
1810 let max_oc = args.max_output_chars;
1811 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1812
1813 struct WorkerResult {
1814 completed: usize,
1815 failed: usize,
1816 skipped: usize,
1817 cost: f64,
1818 oauth: bool,
1819 }
1820
1821 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1822 let handles: Vec<_> = (0..parallelism)
1823 .map(|worker_id| {
1824 let stdout_mu = &stdout_mu;
1825 let paths = &paths;
1826 let namespace = &namespace;
1827 let provider_binary = provider_binary.as_deref();
1828 let operation = &operation;
1829 let mode = &mode;
1830 let prompt_tpl = prompt_tpl.as_deref();
1831 s.spawn(move || {
1832 let w_conn = match open_rw(&paths.db) {
1833 Ok(c) => c,
1834 Err(e) => {
1835 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1836 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1837 }
1838 };
1839 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1840 Ok(c) => c,
1841 Err(e) => {
1842 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1843 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1844 }
1845 };
1846 let mut w_completed = 0usize;
1847 let mut w_failed = 0usize;
1848 let mut w_skipped = 0usize;
1849 let mut w_cost = 0.0f64;
1850 let mut w_oauth = false;
1851 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1852 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1853 let mut w_breaker = crate::retry::CircuitBreaker::new(
1859 args.circuit_breaker_threshold.max(1),
1860 std::time::Duration::from_secs(60),
1861 );
1862
1863 loop {
1864 if crate::shutdown_requested() {
1865 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1866 break;
1867 }
1868 if let Some(b) = budget {
1869 if !w_oauth && w_cost >= b {
1870 break;
1871 }
1872 }
1873 let pending: Option<(i64, String, String)> = w_queue
1874 .query_row(
1875 "UPDATE queue SET status='processing', attempt=attempt+1 \
1876 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1877 RETURNING id, item_key, item_type",
1878 [],
1879 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1880 )
1881 .ok();
1882 let (queue_id, item_key, _item_type) = match pending {
1883 Some(p) => p,
1884 None => break,
1885 };
1886 let item_started = Instant::now();
1887 let current_index = w_completed + w_failed + w_skipped;
1888
1889 let call_result = match operation {
1890 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1891 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1892 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
1893 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
1894 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1895 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1896 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1897 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1898 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1899 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1900 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1901 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1902 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1903 };
1904
1905 match call_result {
1906 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1907 if is_oauth { w_oauth = true; }
1908 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1909 let _ = w_queue.execute(
1910 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1911 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1912 );
1913 w_completed += 1;
1914 if !is_oauth { w_cost += cost; }
1915 let _ = w_breaker
1917 .record(crate::retry::AttemptOutcome::Success);
1918 let _guard = stdout_mu.lock();
1919 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1920 }
1921 Ok(EnrichItemResult::Skipped { reason }) => {
1922 w_skipped += 1;
1923 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1924 let _guard = stdout_mu.lock();
1925 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1926 }
1927 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1928 w_skipped += 1;
1934 let reason = format!(
1935 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1936 );
1937 let _ = w_queue.execute(
1938 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1939 rusqlite::params![reason, queue_id],
1940 );
1941 let _guard = stdout_mu.lock();
1942 emit_json(&ItemEvent {
1943 item: &item_key,
1944 status: "preservation_failed",
1945 memory_id: None,
1946 entity_id: None,
1947 entities: None,
1948 rels: None,
1949 chars_before: Some(chars_before),
1950 chars_after: Some(chars_after),
1951 cost_usd: None,
1952 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1953 error: Some(reason),
1954 index: current_index,
1955 total,
1956 });
1957 }
1958 Err(e) => {
1959 let err_str = format!("{e}");
1960 if matches!(e, AppError::RateLimited { .. }) {
1961 if crate::retry::is_kill_switch_active() {
1962 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1963 } else if std::time::Instant::now() >= w_deadline {
1964 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1965 } else {
1966 let half = w_backoff / 2;
1967 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1968 let actual_wait = half + jitter;
1969 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1970 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1971 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1972 w_backoff = (w_backoff * 2).min(900);
1973 continue;
1974 }
1975 }
1976 w_failed += 1;
1977 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1978 let _guard = stdout_mu.lock();
1979 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1980 let breaker_opened = w_breaker
1982 .record(crate::retry::AttemptOutcome::HardFailure);
1983 if breaker_opened {
1984 tracing::error!(target: "enrich",
1985 consecutive_failures = w_breaker.consecutive_failures(),
1986 "circuit breaker opened — aborting worker"
1987 );
1988 break;
1989 }
1990 }
1991 }
1992 }
1993 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1994 })
1995 })
1996 .collect();
1997 handles
1998 .into_iter()
1999 .map(|h| {
2000 h.join().unwrap_or(WorkerResult {
2001 completed: 0,
2002 failed: 0,
2003 skipped: 0,
2004 cost: 0.0,
2005 oauth: false,
2006 })
2007 })
2008 .collect()
2009 });
2010
2011 for r in &results {
2012 completed += r.completed;
2013 failed += r.failed;
2014 skipped += r.skipped;
2015 cost_total += r.cost;
2016 if r.oauth && !oauth_detected {
2017 oauth_detected = true;
2018 }
2019 }
2020 } else {
2021 loop {
2023 if crate::shutdown_requested() {
2024 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2025 break;
2026 }
2027
2028 if let Some(budget) = args.max_cost_usd {
2030 if !oauth_detected && cost_total >= budget {
2031 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2032 break;
2033 }
2034 }
2035
2036 let pending: Option<(i64, String, String)> = queue_conn
2038 .query_row(
2039 "UPDATE queue SET status='processing', attempt=attempt+1 \
2040 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2041 RETURNING id, item_key, item_type",
2042 [],
2043 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2044 )
2045 .ok();
2046
2047 let (queue_id, item_key, item_type) = match pending {
2048 Some(p) => p,
2049 None => break,
2050 };
2051
2052 let item_started = Instant::now();
2053 let current_index = completed + failed + skipped;
2054
2055 let call_result = match args.operation {
2056 EnrichOperation::MemoryBindings => call_memory_bindings(
2057 &conn,
2058 &namespace,
2059 &item_key,
2060 provider_binary
2061 .as_deref()
2062 .expect("provider binary required"),
2063 provider_model,
2064 provider_timeout,
2065 &args.mode,
2066 ),
2067 EnrichOperation::EntityDescriptions => call_entity_description(
2068 &conn,
2069 &namespace,
2070 &item_key,
2071 provider_binary
2072 .as_deref()
2073 .expect("provider binary required"),
2074 provider_model,
2075 provider_timeout,
2076 &args.mode,
2077 ),
2078 EnrichOperation::BodyEnrich => call_body_enrich(
2079 &conn,
2080 &namespace,
2081 &item_key,
2082 provider_binary
2083 .as_deref()
2084 .expect("provider binary required"),
2085 provider_model,
2086 provider_timeout,
2087 &args.mode,
2088 args.min_output_chars,
2089 args.max_output_chars,
2090 args.prompt_template.as_deref(),
2091 args.preserve_threshold,
2092 &paths,
2093 llm_backend,
2094 ),
2095 EnrichOperation::ReEmbed => {
2096 call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2097 }
2098 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2099 &conn,
2100 &namespace,
2101 &item_key,
2102 provider_binary
2103 .as_deref()
2104 .expect("provider binary required"),
2105 provider_model,
2106 provider_timeout,
2107 &args.mode,
2108 ),
2109 EnrichOperation::RelationReclassify => call_relation_reclassify(
2110 &conn,
2111 &namespace,
2112 &item_key,
2113 provider_binary
2114 .as_deref()
2115 .expect("provider binary required"),
2116 provider_model,
2117 provider_timeout,
2118 &args.mode,
2119 ),
2120 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2121 call_entity_connect(
2122 &conn,
2123 &namespace,
2124 &item_key,
2125 provider_binary
2126 .as_deref()
2127 .expect("provider binary required"),
2128 provider_model,
2129 provider_timeout,
2130 &args.mode,
2131 )
2132 }
2133 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2134 &conn,
2135 &namespace,
2136 &item_key,
2137 provider_binary
2138 .as_deref()
2139 .expect("provider binary required"),
2140 provider_model,
2141 provider_timeout,
2142 &args.mode,
2143 ),
2144 EnrichOperation::DescriptionEnrich => call_description_enrich(
2145 &conn,
2146 &namespace,
2147 &item_key,
2148 provider_binary
2149 .as_deref()
2150 .expect("provider binary required"),
2151 provider_model,
2152 provider_timeout,
2153 &args.mode,
2154 ),
2155 EnrichOperation::DomainClassify => call_domain_classify(
2156 &conn,
2157 &namespace,
2158 &item_key,
2159 provider_binary
2160 .as_deref()
2161 .expect("provider binary required"),
2162 provider_model,
2163 provider_timeout,
2164 &args.mode,
2165 ),
2166 EnrichOperation::GraphAudit => call_graph_audit(
2167 &conn,
2168 &namespace,
2169 &item_key,
2170 provider_binary
2171 .as_deref()
2172 .expect("provider binary required"),
2173 provider_model,
2174 provider_timeout,
2175 &args.mode,
2176 ),
2177 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2178 &conn,
2179 &namespace,
2180 &item_key,
2181 provider_binary
2182 .as_deref()
2183 .expect("provider binary required"),
2184 provider_model,
2185 provider_timeout,
2186 &args.mode,
2187 ),
2188 EnrichOperation::BodyExtract => call_body_extract(
2189 &conn,
2190 &namespace,
2191 &item_key,
2192 provider_binary
2193 .as_deref()
2194 .expect("provider binary required"),
2195 provider_model,
2196 provider_timeout,
2197 &args.mode,
2198 ),
2199 };
2200
2201 match call_result {
2202 Ok(EnrichItemResult::Done {
2203 memory_id,
2204 entity_id,
2205 entities,
2206 rels,
2207 chars_before,
2208 chars_after,
2209 cost,
2210 is_oauth,
2211 }) => {
2212 if is_oauth && !oauth_detected {
2213 oauth_detected = true;
2214 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2215 }
2216 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2217
2218 let persist_err: Option<String> = match args.operation {
2220 EnrichOperation::MemoryBindings => {
2221 None
2223 }
2224 EnrichOperation::EntityDescriptions => {
2225 None
2227 }
2228 EnrichOperation::BodyEnrich => {
2229 None
2231 }
2232 _ => {
2233 None
2235 }
2236 };
2237
2238 if let Err(e) = queue_conn.execute(
2239 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2240 rusqlite::params![
2241 memory_id,
2242 entity_id,
2243 entities as i64,
2244 rels as i64,
2245 cost,
2246 item_started.elapsed().as_millis() as i64,
2247 queue_id
2248 ],
2249 ) {
2250 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2251 }
2252
2253 if persist_err.is_none() {
2254 completed += 1;
2255 if !is_oauth {
2256 cost_total += cost;
2257 }
2258 emit_json(&ItemEvent {
2259 item: &item_key,
2260 status: "done",
2261 memory_id,
2262 entity_id,
2263 entities: Some(entities),
2264 rels: Some(rels),
2265 chars_before,
2266 chars_after,
2267 cost_usd: if is_oauth { None } else { Some(cost) },
2268 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2269 error: None,
2270 index: current_index,
2271 total,
2272 });
2273 } else {
2274 failed += 1;
2275 emit_json(&ItemEvent {
2276 item: &item_key,
2277 status: "failed",
2278 memory_id: None,
2279 entity_id: None,
2280 entities: None,
2281 rels: None,
2282 chars_before: None,
2283 chars_after: None,
2284 cost_usd: None,
2285 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2286 error: persist_err,
2287 index: current_index,
2288 total,
2289 });
2290 }
2291 }
2292 Ok(EnrichItemResult::Skipped { reason }) => {
2293 skipped += 1;
2294 if let Err(e) = queue_conn.execute(
2295 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2296 rusqlite::params![reason, queue_id],
2297 ) {
2298 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2299 }
2300 emit_json(&ItemEvent {
2301 item: &item_key,
2302 status: "skipped",
2303 memory_id: None,
2304 entity_id: None,
2305 entities: None,
2306 rels: None,
2307 chars_before: None,
2308 chars_after: None,
2309 cost_usd: None,
2310 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2311 error: None,
2312 index: current_index,
2313 total,
2314 });
2315 }
2316 Ok(EnrichItemResult::PreservationFailed {
2317 score,
2318 threshold,
2319 chars_before,
2320 chars_after,
2321 }) => {
2322 skipped += 1;
2329 let reason = format!(
2330 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2331 );
2332 if let Err(qe) = queue_conn.execute(
2333 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2334 rusqlite::params![reason, queue_id],
2335 ) {
2336 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2337 }
2338 emit_json(&ItemEvent {
2339 item: &item_key,
2340 status: "preservation_failed",
2341 memory_id: None,
2342 entity_id: None,
2343 entities: None,
2344 rels: None,
2345 chars_before: Some(chars_before),
2346 chars_after: Some(chars_after),
2347 cost_usd: None,
2348 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2349 error: Some(reason),
2350 index: current_index,
2351 total,
2352 });
2353 }
2354 Err(e) => {
2355 let err_str = format!("{e}");
2356 if matches!(e, AppError::RateLimited { .. }) {
2357 if crate::retry::is_kill_switch_active() {
2358 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2359 } else if std::time::Instant::now() >= rate_limit_deadline {
2360 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2361 } else {
2362 let half = backoff_secs / 2;
2363 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2364 let actual_wait = half + jitter;
2365 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2366 if let Err(qe) = queue_conn.execute(
2367 "UPDATE queue SET status='pending' WHERE id=?1",
2368 rusqlite::params![queue_id],
2369 ) {
2370 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2371 }
2372 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2373 backoff_secs = (backoff_secs * 2).min(900);
2374 continue;
2375 }
2376 }
2377
2378 failed += 1;
2379 if let Err(qe) = queue_conn.execute(
2380 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2381 rusqlite::params![err_str, queue_id],
2382 ) {
2383 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2384 }
2385 emit_json(&ItemEvent {
2386 item: &item_key,
2387 status: "failed",
2388 memory_id: None,
2389 entity_id: None,
2390 entities: None,
2391 rels: None,
2392 chars_before: None,
2393 chars_after: None,
2394 cost_usd: None,
2395 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2396 error: Some(err_str),
2397 index: current_index,
2398 total,
2399 });
2400 }
2401 }
2402
2403 let _ = item_type; }
2405 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2408 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2409
2410 emit_json(&EnrichSummary {
2411 summary: true,
2412 operation: format!("{:?}", args.operation),
2413 items_total: total,
2414 completed,
2415 failed,
2416 skipped,
2417 cost_usd: cost_total,
2418 elapsed_ms: started.elapsed().as_millis() as u64,
2419 backend_invoked: take_enrich_backend(),
2420 });
2421
2422 if failed == 0 {
2423 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2424 }
2425
2426 Ok(())
2427}
2428
2429enum EnrichItemResult {
2434 Done {
2435 memory_id: Option<i64>,
2436 entity_id: Option<i64>,
2437 entities: usize,
2438 rels: usize,
2439 chars_before: Option<usize>,
2440 chars_after: Option<usize>,
2441 cost: f64,
2442 is_oauth: bool,
2443 },
2444 Skipped {
2445 reason: String,
2446 },
2447 PreservationFailed {
2452 score: f64,
2453 threshold: f64,
2454 chars_before: usize,
2455 chars_after: usize,
2456 },
2457}
2458
2459fn call_memory_bindings(
2464 conn: &Connection,
2465 namespace: &str,
2466 memory_name: &str,
2467 binary: &Path,
2468 model: Option<&str>,
2469 timeout: u64,
2470 mode: &EnrichMode,
2471) -> Result<EnrichItemResult, AppError> {
2472 let (memory_id, body): (i64, String) = conn.query_row(
2474 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2475 rusqlite::params![namespace, memory_name],
2476 |r| Ok((r.get(0)?, r.get(1)?)),
2477 ).map_err(|e| match e {
2478 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2479 other => AppError::Database(other),
2480 })?;
2481
2482 if body.trim().is_empty() {
2483 return Ok(EnrichItemResult::Skipped {
2484 reason: "body is empty".to_string(),
2485 });
2486 }
2487
2488 let (value, cost, is_oauth) = match mode {
2489 EnrichMode::ClaudeCode => call_claude(
2490 binary,
2491 BINDINGS_PROMPT,
2492 BINDINGS_SCHEMA,
2493 &body,
2494 model,
2495 timeout,
2496 )?,
2497 EnrichMode::Codex => call_codex(
2498 binary,
2499 BINDINGS_PROMPT,
2500 BINDINGS_SCHEMA,
2501 &body,
2502 model,
2503 timeout,
2504 )?,
2505 };
2506
2507 let empty_arr = serde_json::Value::Array(vec![]);
2508 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2509 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2510
2511 let (ent_count, rel_count) =
2512 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2513
2514 Ok(EnrichItemResult::Done {
2515 memory_id: Some(memory_id),
2516 entity_id: None,
2517 entities: ent_count,
2518 rels: rel_count,
2519 chars_before: None,
2520 chars_after: None,
2521 cost,
2522 is_oauth,
2523 })
2524}
2525
2526fn call_entity_description(
2527 conn: &Connection,
2528 namespace: &str,
2529 entity_name: &str,
2530 binary: &Path,
2531 model: Option<&str>,
2532 timeout: u64,
2533 mode: &EnrichMode,
2534) -> Result<EnrichItemResult, AppError> {
2535 let (entity_id, entity_type): (i64, String) = conn
2536 .query_row(
2537 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2538 rusqlite::params![namespace, entity_name],
2539 |r| Ok((r.get(0)?, r.get(1)?)),
2540 )
2541 .map_err(|e| match e {
2542 rusqlite::Error::QueryReturnedNoRows => {
2543 AppError::NotFound(format!("entity '{entity_name}' not found"))
2544 }
2545 other => AppError::Database(other),
2546 })?;
2547
2548 let prompt = format!(
2549 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2550 );
2551
2552 let (value, cost, is_oauth) = match mode {
2553 EnrichMode::ClaudeCode => call_claude(
2554 binary,
2555 &prompt,
2556 ENTITY_DESCRIPTION_SCHEMA,
2557 "",
2558 model,
2559 timeout,
2560 )?,
2561 EnrichMode::Codex => call_codex(
2562 binary,
2563 &prompt,
2564 ENTITY_DESCRIPTION_SCHEMA,
2565 "",
2566 model,
2567 timeout,
2568 )?,
2569 };
2570
2571 let description = value
2572 .get("description")
2573 .and_then(|v| v.as_str())
2574 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2575
2576 persist_entity_description(conn, entity_id, description)?;
2577
2578 Ok(EnrichItemResult::Done {
2579 memory_id: None,
2580 entity_id: Some(entity_id),
2581 entities: 0,
2582 rels: 0,
2583 chars_before: None,
2584 chars_after: None,
2585 cost,
2586 is_oauth,
2587 })
2588}
2589
2590#[allow(clippy::too_many_arguments)]
2591fn call_body_enrich(
2592 conn: &Connection,
2593 namespace: &str,
2594 memory_name: &str,
2595 binary: &Path,
2596 model: Option<&str>,
2597 timeout: u64,
2598 mode: &EnrichMode,
2599 min_output_chars: usize,
2600 max_output_chars: usize,
2601 prompt_template: Option<&Path>,
2602 preserve_threshold: f64,
2603 paths: &crate::paths::AppPaths,
2604 llm_backend: crate::cli::LlmBackendChoice,
2605) -> Result<EnrichItemResult, AppError> {
2606 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2607 .query_row(
2608 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2609 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2610 rusqlite::params![namespace, memory_name],
2611 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2612 )
2613 .map_err(|e| match e {
2614 rusqlite::Error::QueryReturnedNoRows => {
2615 AppError::NotFound(format!("memory '{memory_name}' not found"))
2616 }
2617 other => AppError::Database(other),
2618 })?;
2619
2620 let chars_before = body.chars().count();
2621
2622 let linked_entities: Vec<String> = {
2624 let mut stmt = conn.prepare_cached(
2625 "SELECT e.name FROM memory_entities me \
2626 JOIN entities e ON e.id = me.entity_id \
2627 WHERE me.memory_id = ?1 LIMIT 10",
2628 )?;
2629 let result: Vec<String> = stmt
2630 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2631 .filter_map(|r| r.ok())
2632 .collect();
2633 drop(stmt);
2634 result
2635 };
2636
2637 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2639 let file_size = std::fs::metadata(tmpl_path)
2640 .map_err(|e| {
2641 AppError::Io(std::io::Error::new(
2642 e.kind(),
2643 format!("failed to stat prompt template: {e}"),
2644 ))
2645 })?
2646 .len();
2647 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2648 return Err(AppError::LimitExceeded(
2649 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2650 ));
2651 }
2652 std::fs::read_to_string(tmpl_path).map_err(|e| {
2653 AppError::Io(std::io::Error::new(
2654 e.kind(),
2655 format!("failed to read prompt template: {e}"),
2656 ))
2657 })?
2658 } else {
2659 BODY_ENRICH_PROMPT_PREFIX.to_string()
2660 };
2661
2662 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2664 let mut ctx = String::new();
2665 ctx.push_str(&format!(
2666 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2667 ));
2668 if !description.is_empty() {
2669 ctx.push_str(&format!("- Description: {description}\n"));
2670 }
2671 ctx.push_str(&format!("- Domain: {namespace}\n"));
2672 if !linked_entities.is_empty() {
2673 ctx.push_str(&format!(
2674 "- Linked entities: {}\n",
2675 linked_entities.join(", ")
2676 ));
2677 }
2678 ctx
2679 } else {
2680 String::new()
2681 };
2682
2683 let prompt = format!(
2684 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2685 );
2686
2687 let (value, cost, is_oauth) = match mode {
2689 EnrichMode::ClaudeCode => {
2690 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2691 }
2692 EnrichMode::Codex => {
2693 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2694 }
2695 };
2696
2697 let enriched_body = value
2698 .get("enriched_body")
2699 .and_then(|v| v.as_str())
2700 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2701
2702 let chars_after = enriched_body.chars().count();
2703
2704 let threshold = preserve_threshold;
2711 let verdict =
2712 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2713 if !verdict.is_accepted() {
2714 return Ok(EnrichItemResult::PreservationFailed {
2715 score: match verdict {
2716 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2717 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2718 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2719 },
2720 threshold,
2721 chars_before,
2722 chars_after,
2723 });
2724 }
2725
2726 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2732 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2733 if old_hash == new_hash {
2734 return Ok(EnrichItemResult::Skipped {
2735 reason: format!(
2736 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2737 ),
2738 });
2739 }
2740
2741 if chars_after <= chars_before {
2743 return Ok(EnrichItemResult::Skipped {
2744 reason: format!(
2745 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2746 ),
2747 });
2748 }
2749
2750 persist_enriched_body(
2751 conn,
2752 namespace,
2753 memory_id,
2754 memory_name,
2755 enriched_body,
2756 paths,
2757 llm_backend,
2758 )?;
2759
2760 Ok(EnrichItemResult::Done {
2761 memory_id: Some(memory_id),
2762 entity_id: None,
2763 entities: 0,
2764 rels: 0,
2765 chars_before: Some(chars_before),
2766 chars_after: Some(chars_after),
2767 cost,
2768 is_oauth,
2769 })
2770}
2771
2772fn call_reembed(
2773 conn: &Connection,
2774 namespace: &str,
2775 memory_name: &str,
2776 paths: &crate::paths::AppPaths,
2777 llm_backend: crate::cli::LlmBackendChoice,
2778) -> Result<EnrichItemResult, AppError> {
2779 let (memory_id, body, memory_type): (i64, String, String) = conn
2780 .query_row(
2781 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2782 FROM memories
2783 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2784 rusqlite::params![namespace, memory_name],
2785 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2786 )
2787 .map_err(|e| match e {
2788 rusqlite::Error::QueryReturnedNoRows => {
2789 AppError::NotFound(format!("memory '{memory_name}' not found"))
2790 }
2791 other => AppError::Database(other),
2792 })?;
2793
2794 if body.trim().is_empty() {
2795 return Ok(EnrichItemResult::Skipped {
2796 reason: "body is empty".to_string(),
2797 });
2798 }
2799
2800 reembed_memory_vector(
2801 conn,
2802 namespace,
2803 memory_id,
2804 memory_name,
2805 &memory_type,
2806 &body,
2807 paths,
2808 llm_backend,
2809 )?;
2810
2811 Ok(EnrichItemResult::Done {
2812 memory_id: Some(memory_id),
2813 entity_id: None,
2814 entities: 0,
2815 rels: 0,
2816 chars_before: Some(body.chars().count()),
2817 chars_after: Some(body.chars().count()),
2818 cost: 0.0,
2819 is_oauth: true,
2820 })
2821}
2822
2823fn scan_operation(
2828 conn: &Connection,
2829 namespace: &str,
2830 args: &EnrichArgs,
2831) -> Result<Vec<String>, AppError> {
2832 let name_filter = resolve_name_filter(args)?;
2834 match args.operation {
2835 EnrichOperation::MemoryBindings => {
2836 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2837 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2838 }
2839 EnrichOperation::EntityDescriptions => {
2840 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2841 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2842 }
2843 EnrichOperation::BodyEnrich => {
2844 let rows =
2845 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2846 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2847 }
2848 EnrichOperation::ReEmbed => {
2849 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2850 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2851 }
2852 EnrichOperation::WeightCalibrate => {
2853 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2854 Ok(rows
2855 .into_iter()
2856 .map(|(id, _, _, _, _)| id.to_string())
2857 .collect())
2858 }
2859 EnrichOperation::RelationReclassify => {
2860 let rows = scan_generic_relations(conn, namespace, args.limit)?;
2861 Ok(rows
2862 .into_iter()
2863 .map(|(id, _, _, _)| id.to_string())
2864 .collect())
2865 }
2866 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2867 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2868 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2869 }
2870 EnrichOperation::EntityTypeValidate => {
2871 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2872 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2873 }
2874 EnrichOperation::DescriptionEnrich => {
2875 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2876 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2877 }
2878 EnrichOperation::DomainClassify
2879 | EnrichOperation::GraphAudit
2880 | EnrichOperation::DeepResearchSynth
2881 | EnrichOperation::BodyExtract => {
2882 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2883 let sql = format!(
2884 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2885 );
2886 let mut stmt = conn.prepare(&sql)?;
2887 let names = stmt
2888 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2889 .collect::<Result<Vec<_>, _>>()?;
2890 Ok(names)
2891 }
2892 }
2893}
2894
2895fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2901 if let Some(p) = explicit {
2902 if p.exists() {
2903 return Ok(p.to_path_buf());
2904 }
2905 return Err(AppError::Validation(format!(
2906 "Codex binary not found at explicit path: {}",
2907 p.display()
2908 )));
2909 }
2910
2911 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2912 let p = PathBuf::from(&env_path);
2913 if p.exists() {
2914 return Ok(p);
2915 }
2916 }
2917
2918 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2919 if let Some(path_var) = std::env::var_os("PATH") {
2920 for dir in std::env::split_paths(&path_var) {
2921 let candidate = dir.join(name);
2922 if candidate.exists() {
2923 return Ok(crate::extract::llm_embedding::resolve_real_binary(
2924 &candidate,
2925 ));
2926 }
2927 }
2928 }
2929
2930 Err(AppError::Validation(
2931 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2932 ))
2933}
2934
2935fn call_weight_calibrate(
2937 conn: &Connection,
2938 _namespace: &str,
2939 item_key: &str,
2940 binary: &Path,
2941 model: Option<&str>,
2942 timeout: u64,
2943 mode: &EnrichMode,
2944) -> Result<EnrichItemResult, AppError> {
2945 let rel_id: i64 = item_key
2946 .parse()
2947 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2948 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2949 .query_row(
2950 "SELECT e1.name, e2.name, r.relation, r.weight \
2951 FROM relationships r \
2952 JOIN entities e1 ON e1.id = r.source_id \
2953 JOIN entities e2 ON e2.id = r.target_id \
2954 WHERE r.id = ?1",
2955 rusqlite::params![rel_id],
2956 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2957 )
2958 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2959
2960 let input_text = format!(
2961 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2962 );
2963 let (value, cost, is_oauth) = match mode {
2964 EnrichMode::ClaudeCode => call_claude(
2965 binary,
2966 WEIGHT_CALIBRATE_PROMPT,
2967 WEIGHT_CALIBRATE_SCHEMA,
2968 &input_text,
2969 model,
2970 timeout,
2971 )?,
2972 EnrichMode::Codex => call_codex(
2973 binary,
2974 WEIGHT_CALIBRATE_PROMPT,
2975 WEIGHT_CALIBRATE_SCHEMA,
2976 &input_text,
2977 model,
2978 timeout,
2979 )?,
2980 };
2981
2982 let calibrated = value
2983 .get("calibrated_weight")
2984 .and_then(|v| v.as_f64())
2985 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2986
2987 conn.execute(
2988 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2989 rusqlite::params![calibrated, rel_id],
2990 )?;
2991
2992 Ok(EnrichItemResult::Done {
2993 memory_id: None,
2994 entity_id: None,
2995 entities: 0,
2996 rels: 1,
2997 chars_before: None,
2998 chars_after: None,
2999 cost,
3000 is_oauth,
3001 })
3002}
3003
3004fn call_relation_reclassify(
3006 conn: &Connection,
3007 _namespace: &str,
3008 item_key: &str,
3009 binary: &Path,
3010 model: Option<&str>,
3011 timeout: u64,
3012 mode: &EnrichMode,
3013) -> Result<EnrichItemResult, AppError> {
3014 let rel_id: i64 = item_key
3015 .parse()
3016 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3017 let (source_name, target_name, current_relation): (String, String, String) = conn
3018 .query_row(
3019 "SELECT e1.name, e2.name, r.relation \
3020 FROM relationships r \
3021 JOIN entities e1 ON e1.id = r.source_id \
3022 JOIN entities e2 ON e2.id = r.target_id \
3023 WHERE r.id = ?1",
3024 rusqlite::params![rel_id],
3025 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3026 )
3027 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3028
3029 let input_text = format!(
3030 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3031 );
3032 let (value, cost, is_oauth) = match mode {
3033 EnrichMode::ClaudeCode => call_claude(
3034 binary,
3035 RELATION_RECLASSIFY_PROMPT,
3036 RELATION_RECLASSIFY_SCHEMA,
3037 &input_text,
3038 model,
3039 timeout,
3040 )?,
3041 EnrichMode::Codex => call_codex(
3042 binary,
3043 RELATION_RECLASSIFY_PROMPT,
3044 RELATION_RECLASSIFY_SCHEMA,
3045 &input_text,
3046 model,
3047 timeout,
3048 )?,
3049 };
3050
3051 let new_relation = value
3052 .get("relation")
3053 .and_then(|v| v.as_str())
3054 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3055 let new_strength = value
3056 .get("strength")
3057 .and_then(|v| v.as_f64())
3058 .unwrap_or(0.5);
3059
3060 conn.execute(
3061 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3062 rusqlite::params![new_relation, new_strength, rel_id],
3063 )?;
3064
3065 Ok(EnrichItemResult::Done {
3066 memory_id: None,
3067 entity_id: None,
3068 entities: 0,
3069 rels: 1,
3070 chars_before: None,
3071 chars_after: None,
3072 cost,
3073 is_oauth,
3074 })
3075}
3076
3077fn call_entity_connect(
3079 conn: &Connection,
3080 namespace: &str,
3081 item_key: &str,
3082 binary: &Path,
3083 model: Option<&str>,
3084 timeout: u64,
3085 mode: &EnrichMode,
3086) -> Result<EnrichItemResult, AppError> {
3087 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3088 let (e1_id, e1_name, e2_id, e2_name) =
3089 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3090 Some(p) => p,
3091 None => {
3092 return Ok(EnrichItemResult::Skipped {
3093 reason: "pair no longer isolated".into(),
3094 })
3095 }
3096 };
3097 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3098 let (value, cost, is_oauth) = match mode {
3099 EnrichMode::ClaudeCode => call_claude(
3100 binary,
3101 ENTITY_CONNECT_PROMPT,
3102 ENTITY_CONNECT_SCHEMA,
3103 &input_text,
3104 model,
3105 timeout,
3106 )?,
3107 EnrichMode::Codex => call_codex(
3108 binary,
3109 ENTITY_CONNECT_PROMPT,
3110 ENTITY_CONNECT_SCHEMA,
3111 &input_text,
3112 model,
3113 timeout,
3114 )?,
3115 };
3116 let relation = value
3117 .get("relation")
3118 .and_then(|v| v.as_str())
3119 .unwrap_or("none");
3120 if relation == "none" {
3121 return Ok(EnrichItemResult::Skipped {
3122 reason: "LLM determined no relationship".into(),
3123 });
3124 }
3125 let strength = value
3126 .get("strength")
3127 .and_then(|v| v.as_f64())
3128 .unwrap_or(0.5);
3129 conn.execute(
3130 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3131 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3132 )?;
3133 Ok(EnrichItemResult::Done {
3134 memory_id: None,
3135 entity_id: None,
3136 entities: 0,
3137 rels: 1,
3138 chars_before: None,
3139 chars_after: None,
3140 cost,
3141 is_oauth,
3142 })
3143}
3144
3145fn call_entity_type_validate(
3147 conn: &Connection,
3148 _namespace: &str,
3149 item_key: &str,
3150 binary: &Path,
3151 model: Option<&str>,
3152 timeout: u64,
3153 mode: &EnrichMode,
3154) -> Result<EnrichItemResult, AppError> {
3155 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3156 .query_row(
3157 "SELECT id, name, type FROM entities WHERE name = ?1",
3158 rusqlite::params![item_key],
3159 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3160 )
3161 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3162 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3163 let (value, cost, is_oauth) = match mode {
3164 EnrichMode::ClaudeCode => call_claude(
3165 binary,
3166 ENTITY_TYPE_VALIDATE_PROMPT,
3167 ENTITY_TYPE_VALIDATE_SCHEMA,
3168 &input_text,
3169 model,
3170 timeout,
3171 )?,
3172 EnrichMode::Codex => call_codex(
3173 binary,
3174 ENTITY_TYPE_VALIDATE_PROMPT,
3175 ENTITY_TYPE_VALIDATE_SCHEMA,
3176 &input_text,
3177 model,
3178 timeout,
3179 )?,
3180 };
3181 let validated_type = value
3182 .get("validated_type")
3183 .and_then(|v| v.as_str())
3184 .unwrap_or(&ent_type);
3185 let was_correct = value
3186 .get("was_correct")
3187 .and_then(|v| v.as_bool())
3188 .unwrap_or(true);
3189 if !was_correct {
3190 conn.execute(
3191 "UPDATE entities SET type = ?1 WHERE id = ?2",
3192 rusqlite::params![validated_type, ent_id],
3193 )?;
3194 }
3195 Ok(EnrichItemResult::Done {
3196 memory_id: None,
3197 entity_id: Some(ent_id),
3198 entities: 1,
3199 rels: 0,
3200 chars_before: None,
3201 chars_after: None,
3202 cost,
3203 is_oauth,
3204 })
3205}
3206
3207fn call_description_enrich(
3209 conn: &Connection,
3210 _namespace: &str,
3211 item_key: &str,
3212 binary: &Path,
3213 model: Option<&str>,
3214 timeout: u64,
3215 mode: &EnrichMode,
3216) -> Result<EnrichItemResult, AppError> {
3217 let (mem_id, body, old_desc): (i64, String, String) = conn
3218 .query_row(
3219 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3220 rusqlite::params![item_key],
3221 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3222 )
3223 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3224 let snippet: String = body.chars().take(500).collect();
3225 let input_text = format!(
3226 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3227 );
3228 let (value, cost, is_oauth) = match mode {
3229 EnrichMode::ClaudeCode => call_claude(
3230 binary,
3231 DESCRIPTION_ENRICH_PROMPT,
3232 DESCRIPTION_ENRICH_SCHEMA,
3233 &input_text,
3234 model,
3235 timeout,
3236 )?,
3237 EnrichMode::Codex => call_codex(
3238 binary,
3239 DESCRIPTION_ENRICH_PROMPT,
3240 DESCRIPTION_ENRICH_SCHEMA,
3241 &input_text,
3242 model,
3243 timeout,
3244 )?,
3245 };
3246 let new_desc = value
3247 .get("description")
3248 .and_then(|v| v.as_str())
3249 .unwrap_or(&old_desc);
3250 let old_name: String = conn
3251 .query_row(
3252 "SELECT name FROM memories WHERE id = ?1",
3253 rusqlite::params![mem_id],
3254 |r| r.get(0),
3255 )?;
3256 conn.execute(
3257 "UPDATE memories SET description = ?1 WHERE id = ?2",
3258 rusqlite::params![new_desc, mem_id],
3259 )?;
3260 memories::sync_fts_after_update(
3261 conn, mem_id,
3262 &old_name, &old_desc, &body,
3263 &old_name, new_desc, &body,
3264 )?;
3265 Ok(EnrichItemResult::Done {
3266 memory_id: Some(mem_id),
3267 entity_id: None,
3268 entities: 0,
3269 rels: 0,
3270 chars_before: Some(old_desc.len()),
3271 chars_after: Some(new_desc.len()),
3272 cost,
3273 is_oauth,
3274 })
3275}
3276
3277fn call_domain_classify(
3279 conn: &Connection,
3280 _namespace: &str,
3281 item_key: &str,
3282 binary: &Path,
3283 model: Option<&str>,
3284 timeout: u64,
3285 mode: &EnrichMode,
3286) -> Result<EnrichItemResult, AppError> {
3287 let (mem_id, body, desc): (i64, String, String) = conn
3288 .query_row(
3289 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3290 rusqlite::params![item_key],
3291 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3292 )
3293 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3294 let snippet: String = body.chars().take(500).collect();
3295 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3296 let (value, cost, is_oauth) = match mode {
3297 EnrichMode::ClaudeCode => call_claude(
3298 binary,
3299 DOMAIN_CLASSIFY_PROMPT,
3300 DOMAIN_CLASSIFY_SCHEMA,
3301 &input_text,
3302 model,
3303 timeout,
3304 )?,
3305 EnrichMode::Codex => call_codex(
3306 binary,
3307 DOMAIN_CLASSIFY_PROMPT,
3308 DOMAIN_CLASSIFY_SCHEMA,
3309 &input_text,
3310 model,
3311 timeout,
3312 )?,
3313 };
3314 let domain = value
3315 .get("domain")
3316 .and_then(|v| v.as_str())
3317 .unwrap_or("uncategorized");
3318 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3319 conn.execute(
3320 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3321 rusqlite::params![metadata, mem_id],
3322 )?;
3323 Ok(EnrichItemResult::Done {
3324 memory_id: Some(mem_id),
3325 entity_id: None,
3326 entities: 0,
3327 rels: 0,
3328 chars_before: None,
3329 chars_after: None,
3330 cost,
3331 is_oauth,
3332 })
3333}
3334
3335fn call_graph_audit(
3337 conn: &Connection,
3338 _namespace: &str,
3339 item_key: &str,
3340 binary: &Path,
3341 model: Option<&str>,
3342 timeout: u64,
3343 mode: &EnrichMode,
3344) -> Result<EnrichItemResult, AppError> {
3345 let (mem_id, body, desc): (i64, String, String) = conn
3346 .query_row(
3347 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3348 rusqlite::params![item_key],
3349 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3350 )
3351 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3352 let snippet: String = body.chars().take(500).collect();
3353 let ent_count: i64 = conn
3354 .query_row(
3355 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3356 rusqlite::params![mem_id],
3357 |r| r.get(0),
3358 )
3359 .unwrap_or(0);
3360 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3361 let (value, cost, is_oauth) = match mode {
3362 EnrichMode::ClaudeCode => call_claude(
3363 binary,
3364 GRAPH_AUDIT_PROMPT,
3365 GRAPH_AUDIT_SCHEMA,
3366 &input_text,
3367 model,
3368 timeout,
3369 )?,
3370 EnrichMode::Codex => call_codex(
3371 binary,
3372 GRAPH_AUDIT_PROMPT,
3373 GRAPH_AUDIT_SCHEMA,
3374 &input_text,
3375 model,
3376 timeout,
3377 )?,
3378 };
3379 let issues = value
3380 .get("issues")
3381 .and_then(|v| v.as_array())
3382 .map(|a| a.len())
3383 .unwrap_or(0);
3384 Ok(EnrichItemResult::Done {
3385 memory_id: Some(mem_id),
3386 entity_id: None,
3387 entities: 0,
3388 rels: issues,
3389 chars_before: None,
3390 chars_after: None,
3391 cost,
3392 is_oauth,
3393 })
3394}
3395
3396fn call_deep_research_synth(
3398 conn: &Connection,
3399 namespace: &str,
3400 item_key: &str,
3401 binary: &Path,
3402 model: Option<&str>,
3403 timeout: u64,
3404 mode: &EnrichMode,
3405) -> Result<EnrichItemResult, AppError> {
3406 let (mem_id, body): (i64, String) = conn
3407 .query_row(
3408 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3409 rusqlite::params![item_key],
3410 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3411 )
3412 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3413 let snippet: String = body.chars().take(2000).collect();
3414 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3415 let (value, cost, is_oauth) = match mode {
3416 EnrichMode::ClaudeCode => call_claude(
3417 binary,
3418 DEEP_RESEARCH_SYNTH_PROMPT,
3419 DEEP_RESEARCH_SYNTH_SCHEMA,
3420 &input_text,
3421 model,
3422 timeout,
3423 )?,
3424 EnrichMode::Codex => call_codex(
3425 binary,
3426 DEEP_RESEARCH_SYNTH_PROMPT,
3427 DEEP_RESEARCH_SYNTH_SCHEMA,
3428 &input_text,
3429 model,
3430 timeout,
3431 )?,
3432 };
3433 let mut ent_count = 0usize;
3434 let mut rel_count = 0usize;
3435 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3436 for e in ents {
3437 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3438 let etype_str = e
3439 .get("entity_type")
3440 .and_then(|v| v.as_str())
3441 .unwrap_or("concept");
3442 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3443 if name.len() >= 2 {
3444 let ne = NewEntity {
3445 name: name.to_string(),
3446 entity_type: etype,
3447 description: None,
3448 };
3449 let _ = entities::upsert_entity(conn, namespace, &ne);
3450 ent_count += 1;
3451 }
3452 }
3453 }
3454 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3455 for r in rels {
3456 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3457 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3458 if src.is_empty() || tgt.is_empty() {
3459 continue;
3460 }
3461 let rel = r
3462 .get("relation")
3463 .and_then(|v| v.as_str())
3464 .unwrap_or("related");
3465 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3466 if let (Some(sid), Some(tid)) = (
3467 entities::find_entity_id(conn, namespace, src)?,
3468 entities::find_entity_id(conn, namespace, tgt)?,
3469 ) {
3470 let _ = entities::create_or_fetch_relationship(
3471 conn, namespace, sid, tid, rel, str_, None,
3472 );
3473 rel_count += 1;
3474 }
3475 }
3476 }
3477 Ok(EnrichItemResult::Done {
3478 memory_id: Some(mem_id),
3479 entity_id: None,
3480 entities: ent_count,
3481 rels: rel_count,
3482 chars_before: None,
3483 chars_after: None,
3484 cost,
3485 is_oauth,
3486 })
3487}
3488
3489fn call_body_extract(
3491 conn: &Connection,
3492 _namespace: &str,
3493 item_key: &str,
3494 binary: &Path,
3495 model: Option<&str>,
3496 timeout: u64,
3497 mode: &EnrichMode,
3498) -> Result<EnrichItemResult, AppError> {
3499 let (mem_id, body, old_desc): (i64, String, String) = conn
3500 .query_row(
3501 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3502 rusqlite::params![item_key],
3503 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3504 )
3505 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3506 let old_name: String = conn
3507 .query_row(
3508 "SELECT name FROM memories WHERE id = ?1",
3509 rusqlite::params![mem_id],
3510 |r| r.get(0),
3511 )?;
3512 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3513 let (value, cost, is_oauth) = match mode {
3514 EnrichMode::ClaudeCode => call_claude(
3515 binary,
3516 BODY_EXTRACT_PROMPT,
3517 BODY_EXTRACT_SCHEMA,
3518 &input_text,
3519 model,
3520 timeout,
3521 )?,
3522 EnrichMode::Codex => call_codex(
3523 binary,
3524 BODY_EXTRACT_PROMPT,
3525 BODY_EXTRACT_SCHEMA,
3526 &input_text,
3527 model,
3528 timeout,
3529 )?,
3530 };
3531 let restructured = value
3532 .get("restructured_body")
3533 .and_then(|v| v.as_str())
3534 .unwrap_or(&body);
3535 let chars_before = body.len();
3536 let chars_after = restructured.len();
3537 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3538 conn.execute(
3539 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3540 rusqlite::params![restructured, new_hash, mem_id],
3541 )?;
3542 memories::sync_fts_after_update(
3543 conn, mem_id,
3544 &old_name, &old_desc, &body,
3545 &old_name, &old_desc, restructured,
3546 )?;
3547 Ok(EnrichItemResult::Done {
3548 memory_id: Some(mem_id),
3549 entity_id: None,
3550 entities: 0,
3551 rels: 0,
3552 chars_before: Some(chars_before),
3553 chars_after: Some(chars_after),
3554 cost,
3555 is_oauth,
3556 })
3557}
3558
3559#[allow(clippy::type_complexity)]
3561fn scan_isolated_entity_pairs(
3562 conn: &Connection,
3563 namespace: &str,
3564 limit: Option<usize>,
3565) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3566 let limit_val = limit.unwrap_or(50) as i64;
3567 let mut stmt = conn.prepare_cached(
3568 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3569 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3570 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3571 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3572 (r.source_id = e2.id AND r.target_id = e1.id)) \
3573 LIMIT ?2",
3574 )?;
3575 let rows = stmt
3576 .query_map(rusqlite::params![namespace, limit_val], |r| {
3577 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3578 })?
3579 .collect::<Result<Vec<_>, _>>()?;
3580 Ok(rows)
3581}
3582
3583fn scan_entities_for_type_validation(
3585 conn: &Connection,
3586 namespace: &str,
3587 limit: Option<usize>,
3588) -> Result<Vec<(i64, String, String)>, AppError> {
3589 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3590 let sql = format!(
3591 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3592 );
3593 let mut stmt = conn.prepare(&sql)?;
3594 let rows = stmt
3595 .query_map(rusqlite::params![namespace], |r| {
3596 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3597 })?
3598 .collect::<Result<Vec<_>, _>>()?;
3599 Ok(rows)
3600}
3601
3602fn scan_generic_descriptions(
3604 conn: &Connection,
3605 namespace: &str,
3606 limit: Option<usize>,
3607) -> Result<Vec<(i64, String, String)>, AppError> {
3608 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3609 let sql = format!(
3610 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3611 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3612 ORDER BY id {limit_clause}"
3613 );
3614 let mut stmt = conn.prepare(&sql)?;
3615 let rows = stmt
3616 .query_map(rusqlite::params![namespace], |r| {
3617 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3618 })?
3619 .collect::<Result<Vec<_>, _>>()?;
3620 Ok(rows)
3621}
3622
3623fn call_codex(
3627 binary: &Path,
3628 prompt: &str,
3629 json_schema: &str,
3630 input_text: &str,
3631 model: Option<&str>,
3632 timeout_secs: u64,
3633) -> Result<(serde_json::Value, f64, bool), AppError> {
3634 use wait_timeout::ChildExt;
3635
3636 super::codex_spawn::validate_codex_model(model)?;
3641 let schema_file = super::codex_spawn::trusted_schema_path()?;
3642
3643 let args = super::codex_spawn::CodexSpawnArgs {
3644 binary,
3645 prompt,
3646 json_schema,
3647 input_text,
3648 model,
3649 timeout_secs,
3650 schema_path: schema_file.clone(),
3651 };
3652 let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3653
3654 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3655 AppError::Io(std::io::Error::new(
3656 e.kind(),
3657 format!("failed to spawn codex: {e}"),
3658 ))
3659 })?;
3660
3661 let full_prompt = format!("{prompt}\n\n{input_text}");
3662 let stdin_bytes = full_prompt.into_bytes();
3663 let mut child_stdin = child
3664 .stdin
3665 .take()
3666 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3667 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3668 child_stdin.write_all(&stdin_bytes)?;
3669 drop(child_stdin);
3670 Ok(())
3671 });
3672
3673 let start = std::time::Instant::now();
3674 let timeout = std::time::Duration::from_secs(timeout_secs);
3675 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3676 let _ = std::fs::remove_file(&schema_file);
3677
3678 match status {
3679 Some(exit_status) => {
3680 stdin_thread
3681 .join()
3682 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3683 .map_err(AppError::Io)?;
3684
3685 tracing::debug!(
3686 target: "process",
3687 exit_code = ?exit_status.code(),
3688 elapsed_ms = start.elapsed().as_millis() as u64,
3689 "external process completed"
3690 );
3691
3692 let mut stdout_buf = Vec::new();
3693 if let Some(mut out) = child.stdout.take() {
3694 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3695 }
3696 if !exit_status.success() {
3697 let mut stderr_buf = Vec::new();
3698 if let Some(mut err) = child.stderr.take() {
3699 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3700 }
3701 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3702 tracing::warn!(
3703 target: "enrich",
3704 exit_code = ?exit_status.code(),
3705 stderr = %stderr_str.trim(),
3706 "codex process failed"
3707 );
3708 return Err(AppError::Validation(format!(
3709 "codex exited with code {:?}: {}",
3710 exit_status.code(),
3711 stderr_str.trim()
3712 )));
3713 }
3714 let stdout_str = String::from_utf8(stdout_buf)
3715 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3716 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3719 let value: serde_json::Value =
3725 serde_json::from_str(&result.last_agent_text).map_err(|e| {
3726 AppError::Validation(format!(
3727 "codex agent_message is not valid JSON: {e}; raw={}",
3728 result.last_agent_text
3729 ))
3730 })?;
3731 Ok((value, 0.0, false))
3732 }
3733 None => {
3734 let _ = child.kill();
3735 let _ = child.wait();
3736 let _ = stdin_thread.join();
3737 Err(AppError::Validation(format!(
3738 "codex timed out after {timeout_secs} seconds"
3739 )))
3740 }
3741 }
3742}
3743
3744#[cfg(test)]
3749mod tests {
3750 use super::*;
3751 use rusqlite::Connection;
3752 #[cfg(unix)]
3753 use std::os::unix::fs::PermissionsExt;
3754
3755 fn open_test_db() -> Connection {
3757 let conn = Connection::open_in_memory().expect("in-memory db");
3758 conn.execute_batch(
3759 "CREATE TABLE memories (
3760 id INTEGER PRIMARY KEY AUTOINCREMENT,
3761 namespace TEXT NOT NULL DEFAULT 'global',
3762 name TEXT NOT NULL,
3763 type TEXT NOT NULL DEFAULT 'note',
3764 description TEXT NOT NULL DEFAULT '',
3765 body TEXT NOT NULL DEFAULT '',
3766 body_hash TEXT NOT NULL DEFAULT '',
3767 session_id TEXT,
3768 source TEXT NOT NULL DEFAULT 'agent',
3769 metadata TEXT NOT NULL DEFAULT '{}',
3770 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3771 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3772 deleted_at INTEGER,
3773 UNIQUE(namespace, name)
3774 );
3775 CREATE TABLE entities (
3776 id INTEGER PRIMARY KEY AUTOINCREMENT,
3777 namespace TEXT NOT NULL DEFAULT 'global',
3778 name TEXT NOT NULL,
3779 type TEXT NOT NULL DEFAULT 'concept',
3780 description TEXT,
3781 degree INTEGER NOT NULL DEFAULT 0,
3782 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3783 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3784 UNIQUE(namespace, name)
3785 );
3786 CREATE TABLE memory_entities (
3787 memory_id INTEGER NOT NULL,
3788 entity_id INTEGER NOT NULL,
3789 PRIMARY KEY (memory_id, entity_id)
3790 );
3791 CREATE TABLE relationships (
3792 id INTEGER PRIMARY KEY AUTOINCREMENT,
3793 namespace TEXT NOT NULL DEFAULT 'global',
3794 source_id INTEGER NOT NULL,
3795 target_id INTEGER NOT NULL,
3796 relation TEXT NOT NULL,
3797 weight REAL NOT NULL DEFAULT 0.5,
3798 description TEXT,
3799 UNIQUE(source_id, target_id, relation)
3800 );
3801 CREATE TABLE memory_embeddings (
3802 memory_id INTEGER PRIMARY KEY,
3803 namespace TEXT NOT NULL,
3804 embedding BLOB NOT NULL,
3805 source TEXT NOT NULL,
3806 model TEXT NOT NULL DEFAULT '',
3807 dim INTEGER NOT NULL DEFAULT 384,
3808 created_at INTEGER NOT NULL DEFAULT (unixepoch())
3809 );",
3810 )
3811 .expect("schema creation must succeed");
3812 conn
3813 }
3814
3815 #[test]
3816 fn scan_unbound_memories_finds_memories_without_bindings() {
3817 let conn = open_test_db();
3818 conn.execute(
3819 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3820 [],
3821 )
3822 .unwrap();
3823
3824 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3825 assert_eq!(results.len(), 1);
3826 assert_eq!(results[0].1, "test-mem");
3827 }
3828
3829 #[test]
3830 fn scan_unbound_memories_excludes_bound_memories() {
3831 let conn = open_test_db();
3832 conn.execute(
3833 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3834 [],
3835 )
3836 .unwrap();
3837 let mem_id: i64 = conn
3838 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3839 r.get(0)
3840 })
3841 .unwrap();
3842 conn.execute(
3843 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3844 [],
3845 )
3846 .unwrap();
3847 let ent_id: i64 = conn
3848 .query_row(
3849 "SELECT id FROM entities WHERE name='some-entity'",
3850 [],
3851 |r| r.get(0),
3852 )
3853 .unwrap();
3854 conn.execute(
3855 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3856 rusqlite::params![mem_id, ent_id],
3857 )
3858 .unwrap();
3859
3860 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3861 assert!(results.is_empty(), "bound memory must not appear in scan");
3862 }
3863
3864 #[test]
3865 fn scan_entities_without_description_finds_null_description() {
3866 let conn = open_test_db();
3867 conn.execute(
3868 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3869 [],
3870 )
3871 .unwrap();
3872
3873 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3874 assert_eq!(results.len(), 1);
3875 assert_eq!(results[0].1, "my-tool");
3876 }
3877
3878 #[test]
3879 fn scan_entities_without_description_excludes_entities_with_description() {
3880 let conn = open_test_db();
3881 conn.execute(
3882 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3883 [],
3884 )
3885 .unwrap();
3886
3887 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3888 assert!(
3889 results.is_empty(),
3890 "entity with description must not appear"
3891 );
3892 }
3893
3894 #[test]
3895 fn scan_short_body_memories_finds_short_bodies() {
3896 let conn = open_test_db();
3897 conn.execute(
3898 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3899 [],
3900 )
3901 .unwrap();
3902
3903 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3904 assert_eq!(results.len(), 1);
3905 assert_eq!(results[0].1, "short-mem");
3906 }
3907
3908 #[test]
3909 fn scan_short_body_memories_excludes_long_bodies() {
3910 let conn = open_test_db();
3911 let long_body = "a".repeat(1000);
3912 conn.execute(
3913 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3914 rusqlite::params![long_body],
3915 )
3916 .unwrap();
3917
3918 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3919 assert!(results.is_empty(), "long memory must not appear in scan");
3920 }
3921
3922 #[test]
3923 fn scan_respects_limit() {
3924 let conn = open_test_db();
3925 for i in 0..5 {
3926 conn.execute(
3927 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3928 [],
3929 )
3930 .unwrap();
3931 }
3932
3933 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3934 assert_eq!(results.len(), 3, "limit must be respected");
3935 }
3936
3937 #[test]
3938 fn scan_memories_without_embeddings_finds_only_missing_rows() {
3939 let conn = open_test_db();
3940 conn.execute(
3941 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3942 [],
3943 )
3944 .unwrap();
3945 conn.execute(
3946 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3947 [],
3948 )
3949 .unwrap();
3950 let memory_id: i64 = conn
3951 .query_row(
3952 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3953 [],
3954 |r| r.get(0),
3955 )
3956 .unwrap();
3957 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3958 memories::upsert_vec(
3959 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3960 )
3961 .unwrap();
3962
3963 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3964 assert_eq!(results.len(), 1);
3965 assert_eq!(results[0].1, "missing-vec");
3966 }
3967
3968 #[test]
3969 fn scan_memories_without_embeddings_respects_name_filter() {
3970 let conn = open_test_db();
3971 conn.execute(
3972 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3973 [],
3974 )
3975 .unwrap();
3976 conn.execute(
3977 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3978 [],
3979 )
3980 .unwrap();
3981
3982 let results =
3983 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3984 .unwrap();
3985 assert_eq!(results.len(), 1);
3986 assert_eq!(results[0].1, "match-me");
3987 }
3988
3989 #[test]
3990 fn queue_db_schema_creates_correctly() {
3991 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3992 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3993 let count: i64 = conn
3994 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3995 .unwrap();
3996 assert_eq!(count, 0);
3997 let _ = std::fs::remove_file(&tmp_path);
3998 }
3999
4000 #[test]
4001 fn parse_claude_output_valid_bindings() {
4002 let output = r#"[
4003 {"type":"system","subtype":"init"},
4004 {"type":"result","is_error":false,"total_cost_usd":0.01,
4005 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4006 ]"#;
4007 let result = crate::commands::claude_runner::parse_claude_output(output)
4008 .expect("must parse successfully");
4009 assert!(result.value.get("entities").is_some());
4010 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4011 assert!(!result.is_oauth);
4012 }
4013
4014 #[test]
4015 fn parse_claude_output_detects_oauth() {
4016 let output = r#"[
4017 {"type":"system","subtype":"init","apiKeySource":"none"},
4018 {"type":"result","is_error":false,"total_cost_usd":0.0,
4019 "structured_output":{"entities":[],"relationships":[]}}
4020 ]"#;
4021 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4022 assert!(result.is_oauth);
4023 }
4024
4025 #[test]
4026 fn parse_claude_output_rate_limit_returns_error() {
4027 let output = r#"[
4028 {"type":"system","subtype":"init"},
4029 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4030 ]"#;
4031 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4032 assert!(matches!(err, AppError::RateLimited { .. }));
4033 }
4034
4035 #[test]
4036 fn parse_claude_output_auth_error() {
4037 let output = r#"[
4038 {"type":"system","subtype":"init"},
4039 {"type":"result","is_error":true,"error":"authentication failed"}
4040 ]"#;
4041 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4042 assert!(format!("{err}").contains("authentication failed"));
4043 }
4044
4045 #[cfg(unix)]
4046 #[test]
4047 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4048 let tmp = tempfile::tempdir().expect("tempdir");
4049 let binary = tmp.path().join("codex-mock");
4050 std::fs::write(
4051 &binary,
4052 r#"#!/usr/bin/env bash
4053set -euo pipefail
4054cat <<'JSONL'
4055{"type":"thread.started","thread_id":"mock-thread-0"}
4056{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4057{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4058JSONL
4059"#,
4060 )
4061 .expect("mock codex write");
4062 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4063 perms.set_mode(0o755);
4064 std::fs::set_permissions(&binary, perms).expect("chmod");
4065
4066 let (value, cost, is_oauth) =
4067 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4068 .expect("call_codex must accept body-enrich payload");
4069
4070 assert_eq!(value["enriched_body"], "expanded body");
4071 assert_eq!(cost, 0.0);
4072 assert!(!is_oauth);
4073 }
4074
4075 #[test]
4076 fn dry_run_emits_preview_without_calling_llm() {
4077 let conn = open_test_db();
4082 conn.execute(
4083 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4084 [],
4085 )
4086 .unwrap();
4087
4088 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4089 assert_eq!(results.len(), 1);
4090 assert_eq!(results[0].1, "dry-mem");
4091 }
4094
4095 #[test]
4096 fn persist_entity_description_updates_db() {
4097 let conn = open_test_db();
4098 conn.execute(
4099 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4100 [],
4101 )
4102 .unwrap();
4103 let eid: i64 = conn
4104 .query_row(
4105 "SELECT id FROM entities WHERE name='tokio-runtime'",
4106 [],
4107 |r| r.get(0),
4108 )
4109 .unwrap();
4110
4111 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4112
4113 let desc: String = conn
4114 .query_row(
4115 "SELECT description FROM entities WHERE id=?1",
4116 rusqlite::params![eid],
4117 |r| r.get(0),
4118 )
4119 .unwrap();
4120 assert_eq!(desc, "Async runtime for Rust applications");
4121 }
4122
4123 #[test]
4124 fn bindings_schema_is_valid_json() {
4125 let _: serde_json::Value =
4126 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4127 }
4128
4129 #[test]
4130 fn entity_description_schema_is_valid_json() {
4131 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4132 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4133 }
4134
4135 #[test]
4136 fn body_enrich_schema_is_valid_json() {
4137 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4138 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4139 }
4140}