1use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51const BINDINGS_SCHEMA: &str = r#"{
56 "type": "object",
57 "properties": {
58 "entities": {
59 "type": "array",
60 "items": {
61 "type": "object",
62 "properties": {
63 "name": { "type": "string" },
64 "entity_type": {
65 "type": "string",
66 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67 }
68 },
69 "required": ["name", "entity_type"],
70 "additionalProperties": false
71 }
72 },
73 "relationships": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "source": { "type": "string" },
79 "target": { "type": "string" },
80 "relation": {
81 "type": "string",
82 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83 },
84 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85 },
86 "required": ["source","target","relation","strength"],
87 "additionalProperties": false
88 }
89 }
90 },
91 "required": ["entities","relationships"],
92 "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96 "type": "object",
97 "properties": {
98 "description": { "type": "string" }
99 },
100 "required": ["description"],
101 "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105 "type": "object",
106 "properties": {
107 "enriched_body": { "type": "string" }
108 },
109 "required": ["enriched_body"],
110 "additionalProperties": false
111}"#;
112
113const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126 "reasoning": { "type": "string" }
127 },
128 "required": ["calibrated_weight", "reasoning"],
129 "additionalProperties": false
130}"#;
131
132const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149 "type": "object",
150 "properties": {
151 "relation": { "type": "string" },
152 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153 "reasoning": { "type": "string" }
154 },
155 "required": ["relation", "strength", "reasoning"],
156 "additionalProperties": false
157}"#;
158
159const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166 "type": "object",
167 "properties": {
168 "relation": { "type": "string" },
169 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170 "reasoning": { "type": "string" }
171 },
172 "required": ["relation", "strength", "reasoning"],
173 "additionalProperties": false
174}"#;
175
176const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183 "type": "object",
184 "properties": {
185 "validated_type": { "type": "string" },
186 "was_correct": { "type": "boolean" },
187 "reasoning": { "type": "string" }
188 },
189 "required": ["validated_type", "was_correct", "reasoning"],
190 "additionalProperties": false
191}"#;
192
193const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200 "type": "object",
201 "properties": {
202 "description": { "type": "string" },
203 "reasoning": { "type": "string" }
204 },
205 "required": ["description", "reasoning"],
206 "additionalProperties": false
207}"#;
208
209const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214 "type": "object",
215 "properties": {
216 "domain": { "type": "string" },
217 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218 "reasoning": { "type": "string" }
219 },
220 "required": ["domain", "confidence", "reasoning"],
221 "additionalProperties": false
222}"#;
223
224const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230 "type": "object",
231 "properties": {
232 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234 "reasoning": { "type": "string" }
235 },
236 "required": ["quality_score", "issues", "reasoning"],
237 "additionalProperties": false
238}"#;
239
240const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247 "type": "object",
248 "properties": {
249 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251 "summary": { "type": "string" }
252 },
253 "required": ["entities", "relationships", "summary"],
254 "additionalProperties": false
255}"#;
256
257const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263 "type": "object",
264 "properties": {
265 "restructured_body": { "type": "string" },
266 "changes_summary": { "type": "string" }
267 },
268 "required": ["restructured_body", "changes_summary"],
269 "additionalProperties": false
270}"#;
271
272const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299 MemoryBindings,
301 EntityDescriptions,
303 BodyEnrich,
305 ReEmbed,
307 WeightCalibrate,
309 RelationReclassify,
311 EntityConnect,
313 EntityTypeValidate,
315 DescriptionEnrich,
317 CrossDomainBridges,
319 DomainClassify,
321 GraphAudit,
323 DeepResearchSynth,
325 BodyExtract,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332 ClaudeCode,
334 Codex,
336 #[value(name = "opencode")]
338 Opencode,
339 #[value(name = "openrouter")]
341 OpenRouter,
342}
343
344impl std::fmt::Display for EnrichMode {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 match self {
347 EnrichMode::ClaudeCode => write!(f, "claude-code"),
348 EnrichMode::Codex => write!(f, "codex"),
349 EnrichMode::Opencode => write!(f, "opencode"),
350 EnrichMode::OpenRouter => write!(f, "openrouter"),
351 }
352 }
353}
354
355#[derive(clap::Args)]
357#[command(
358 about = "Enrich graph memories and entities using an LLM provider",
359 after_long_help = "EXAMPLES:\n \
360 # Add missing entity bindings to all unbound memories\n \
361 sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n \
362 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
363 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
364 # Expand short memory bodies (GAP-18)\n \
365 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
366 # Rebuild only missing memory embeddings without rewriting bodies\n \
367 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
368 # Resume an interrupted body-enrich run\n \
369 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
370 # Retry only failed items from a previous run\n \
371 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
372 EXIT CODES:\n \
373 0 success\n \
374 1 validation error (bad args, binary not found)\n \
375 14 I/O error"
376)]
377pub struct EnrichArgs {
378 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
380 pub operation: EnrichOperation,
381
382 #[arg(long, value_enum)]
384 pub mode: EnrichMode,
385
386 #[arg(long, value_name = "N")]
388 pub limit: Option<usize>,
389
390 #[arg(long)]
392 pub dry_run: bool,
393
394 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
396 pub namespace: Option<String>,
397
398 #[arg(long, value_name = "PATH")]
401 pub claude_binary: Option<PathBuf>,
402
403 #[arg(long, value_name = "MODEL")]
405 pub claude_model: Option<String>,
406
407 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
409 pub claude_timeout: u64,
410
411 #[arg(long, value_name = "PATH")]
414 pub codex_binary: Option<PathBuf>,
415
416 #[arg(long, value_name = "MODEL")]
418 pub codex_model: Option<String>,
419
420 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
422 pub codex_timeout: u64,
423
424 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
427 pub opencode_binary: Option<PathBuf>,
428
429 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
431 pub opencode_model: Option<String>,
432
433 #[arg(
435 long,
436 value_name = "SECONDS",
437 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
438 default_value_t = 300
439 )]
440 pub opencode_timeout: u64,
441
442 #[arg(long, value_name = "MODEL")]
445 pub openrouter_model: Option<String>,
446
447 #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
449 pub openrouter_api_key: Option<String>,
450
451 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
453 pub openrouter_timeout: u64,
454
455 #[arg(long, value_name = "URL")]
457 pub openrouter_base_url: Option<String>,
458
459 #[arg(long, value_name = "USD")]
462 pub max_cost_usd: Option<f64>,
463
464 #[arg(long)]
467 pub resume: bool,
468
469 #[arg(long)]
471 pub retry_failed: bool,
472
473 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
476 pub min_output_chars: usize,
477
478 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
480 pub max_output_chars: usize,
481
482 #[arg(long, default_value_t = true)]
484 pub preserve_check: bool,
485
486 #[arg(long, value_name = "PATH")]
488 pub prompt_template: Option<PathBuf>,
489
490 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
494 pub llm_parallelism: u32,
495
496 #[arg(long)]
499 pub json: bool,
500
501 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
503 pub db: Option<String>,
504
505 #[arg(long, value_name = "SECONDS")]
508 pub wait_job_singleton: Option<u64>,
509
510 #[arg(long, default_value_t = false)]
514 pub force_job_singleton: bool,
515
516 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
520 pub names: Vec<String>,
521
522 #[arg(long, value_name = "PATH")]
526 pub names_file: Option<PathBuf>,
527
528 #[arg(long, default_value_t = false)]
532 pub preflight_check: bool,
533
534 #[arg(long, value_enum)]
538 pub fallback_mode: Option<EnrichMode>,
539
540 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
543 pub rate_limit_buffer: u64,
544
545 #[arg(long, default_value_t = true)]
549 pub max_load_check: bool,
550
551 #[arg(long, value_name = "N", default_value_t = 5)]
554 pub circuit_breaker_threshold: u32,
555
556 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
563 pub preserve_threshold: f64,
564
565 #[arg(long, default_value_t = true)]
570 pub codex_model_validate: bool,
571
572 #[arg(long, value_name = "MODEL")]
577 pub codex_model_fallback: Option<String>,
578}
579
580#[derive(Debug, Serialize)]
589struct PhaseEvent<'a> {
590 phase: &'a str,
591 #[serde(skip_serializing_if = "Option::is_none")]
592 binary_path: Option<&'a str>,
593 #[serde(skip_serializing_if = "Option::is_none")]
594 version: Option<&'a str>,
595 #[serde(skip_serializing_if = "Option::is_none")]
596 items_total: Option<usize>,
597 #[serde(skip_serializing_if = "Option::is_none")]
598 items_pending: Option<usize>,
599 #[serde(skip_serializing_if = "Option::is_none")]
601 llm_parallelism: Option<u32>,
602}
603
604#[derive(Debug, Serialize)]
605struct ItemEvent<'a> {
606 item: &'a str,
608 status: &'a str,
609 #[serde(skip_serializing_if = "Option::is_none")]
610 memory_id: Option<i64>,
611 #[serde(skip_serializing_if = "Option::is_none")]
612 entity_id: Option<i64>,
613 #[serde(skip_serializing_if = "Option::is_none")]
614 entities: Option<usize>,
615 #[serde(skip_serializing_if = "Option::is_none")]
616 rels: Option<usize>,
617 #[serde(skip_serializing_if = "Option::is_none")]
618 chars_before: Option<usize>,
619 #[serde(skip_serializing_if = "Option::is_none")]
620 chars_after: Option<usize>,
621 #[serde(skip_serializing_if = "Option::is_none")]
622 cost_usd: Option<f64>,
623 #[serde(skip_serializing_if = "Option::is_none")]
624 elapsed_ms: Option<u64>,
625 #[serde(skip_serializing_if = "Option::is_none")]
626 error: Option<String>,
627 index: usize,
628 total: usize,
629}
630
631#[derive(Debug, Serialize)]
632struct EnrichSummary {
633 summary: bool,
634 operation: String,
635 items_total: usize,
636 completed: usize,
637 failed: usize,
638 skipped: usize,
639 cost_usd: f64,
640 elapsed_ms: u64,
641 #[serde(skip_serializing_if = "Option::is_none")]
646 backend_invoked: Option<&'static str>,
647}
648
649use crate::output::emit_json_line as emit_json;
650
651fn open_queue_db(path: &str) -> Result<Connection, AppError> {
666 let conn = Connection::open(path)?;
667 conn.pragma_update(None, "journal_mode", "wal")?;
668 conn.execute_batch(
669 "CREATE TABLE IF NOT EXISTS queue (
670 id INTEGER PRIMARY KEY AUTOINCREMENT,
671 item_key TEXT NOT NULL UNIQUE,
672 item_type TEXT NOT NULL DEFAULT 'memory',
673 status TEXT NOT NULL DEFAULT 'pending',
674 memory_id INTEGER,
675 entity_id INTEGER,
676 entities INTEGER DEFAULT 0,
677 rels INTEGER DEFAULT 0,
678 error TEXT,
679 cost_usd REAL DEFAULT 0.0,
680 attempt INTEGER DEFAULT 0,
681 elapsed_ms INTEGER,
682 created_at TEXT DEFAULT (datetime('now')),
683 done_at TEXT
684 );
685 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
686 )?;
687 Ok(conn)
688}
689
690fn call_claude(
698 binary: &Path,
699 prompt: &str,
700 json_schema: &str,
701 input_text: &str,
702 model: Option<&str>,
703 timeout_secs: u64,
704) -> Result<(serde_json::Value, f64, bool), AppError> {
705 let result = crate::commands::claude_runner::run_claude(
706 binary,
707 prompt,
708 json_schema,
709 input_text,
710 model,
711 timeout_secs,
712 7,
713 )?;
714 Ok((result.value, result.cost_usd, result.is_oauth))
715}
716
717fn call_openrouter(
724 prompt: &str,
725 json_schema: &str,
726 input_text: &str,
727 model: Option<&str>,
728 timeout_secs: u64,
729) -> Result<(serde_json::Value, f64, bool), AppError> {
730 let _ = (model, timeout_secs);
734 let client = crate::embedder::openrouter_chat_client().ok_or_else(|| {
735 AppError::Validation(
736 "OpenRouter chat client not initialised before dispatch (internal error)".into(),
737 )
738 })?;
739 let runtime = crate::embedder::shared_runtime()?;
740 runtime.block_on(client.complete(prompt, input_text, json_schema, None))
741}
742
743enum PreflightOutcome {
749 Healthy,
751 RateLimited {
755 reason: String,
756 suggestion: &'static str,
757 },
758 Error(AppError),
760}
761
762fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
770 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
771
772 match args.mode {
773 EnrichMode::ClaudeCode => {
774 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
775 Ok(b) => b,
776 Err(e) => return PreflightOutcome::Error(e),
777 };
778 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
783 Ok(p) => p,
784 Err(e) => {
785 return PreflightOutcome::Error(AppError::Io(e));
786 }
787 };
788 let mut cmd = std::process::Command::new(&bin);
789 crate::spawn::env_whitelist::apply_env_whitelist(
790 &mut cmd,
791 crate::spawn::env_whitelist::is_strict_env_clear(),
792 );
793 if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
794 return PreflightOutcome::Error(e);
795 }
796 cmd.arg("-p")
797 .arg("ping")
798 .arg("--max-turns")
799 .arg("1")
800 .arg("--strict-mcp-config")
801 .arg("--mcp-config")
802 .arg(mcp_config_path.as_os_str())
803 .arg("--dangerously-skip-permissions")
804 .arg("--settings")
805 .arg("{\"hooks\":{}}")
806 .arg("--output-format")
807 .arg("json")
808 .stdin(std::process::Stdio::null())
809 .stdout(std::process::Stdio::piped())
810 .stderr(std::process::Stdio::piped());
811
812 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
813 Ok(c) => c,
814 Err(e) => {
815 return PreflightOutcome::Error(AppError::Io(e));
816 }
817 };
818 let output = match wait_with_timeout(child, timeout) {
819 Ok(out) => out,
820 Err(e) => return PreflightOutcome::Error(e),
821 };
822 if !output.status.success() {
823 let stderr = String::from_utf8_lossy(&output.stderr);
824 if stderr.contains("hit your session limit")
825 || stderr.contains("rate_limit")
826 || stderr.contains("429")
827 {
828 return PreflightOutcome::RateLimited {
829 reason: stderr.trim().to_string(),
830 suggestion:
831 "wait for the OAuth window to reset or use --fallback-mode codex",
832 };
833 }
834 return PreflightOutcome::Error(AppError::Validation(format!(
835 "preflight probe failed: {stderr}",
836 stderr = stderr.trim()
837 )));
838 }
839 PreflightOutcome::Healthy
840 }
841 EnrichMode::Codex => {
842 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
843 Ok(b) => b,
844 Err(e) => return PreflightOutcome::Error(e),
845 };
846 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
847 .map_err(PreflightOutcome::Error)
848 .ok();
849 let schema = "{}";
850 let schema_path = match super::codex_spawn::trusted_schema_path() {
851 Ok(p) => p,
852 Err(e) => return PreflightOutcome::Error(e),
853 };
854 let spawn_args = super::codex_spawn::CodexSpawnArgs {
855 binary: &bin,
856 prompt: "ping",
857 json_schema: schema,
858 input_text: "",
859 model: args.codex_model.as_deref(),
860 timeout_secs: args.rate_limit_buffer.max(60),
861 schema_path: schema_path.clone(),
862 };
863 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
864 Ok(c) => c,
865 Err(e) => return PreflightOutcome::Error(e),
866 };
867 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
868 Ok(c) => c,
869 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
870 };
871 let output = match wait_with_timeout(child, timeout) {
872 Ok(out) => out,
873 Err(e) => return PreflightOutcome::Error(e),
874 };
875 let _ = std::fs::remove_file(&schema_path);
876 if !output.status.success() {
877 let stderr = String::from_utf8_lossy(&output.stderr);
878 if stderr.contains("rate_limit")
879 || stderr.contains("429")
880 || stderr.contains("Too Many Requests")
881 {
882 return PreflightOutcome::RateLimited {
883 reason: stderr.trim().to_string(),
884 suggestion: "wait for the rate-limit window to reset",
885 };
886 }
887 return PreflightOutcome::Error(AppError::Validation(format!(
888 "preflight probe failed: {stderr}",
889 stderr = stderr.trim()
890 )));
891 }
892 PreflightOutcome::Healthy
893 }
894 EnrichMode::Opencode => {
895 let bin = match super::opencode_runner::find_opencode_binary_with_override(
896 args.opencode_binary.as_deref(),
897 ) {
898 Ok(b) => b,
899 Err(e) => return PreflightOutcome::Error(e),
900 };
901 let model =
902 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
903 let mut cmd =
904 match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
905 {
906 Ok(c) => c,
907 Err(e) => return PreflightOutcome::Error(e),
908 };
909 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
910 Ok(c) => c,
911 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
912 };
913 let output = match wait_with_timeout(child, timeout) {
914 Ok(out) => out,
915 Err(e) => return PreflightOutcome::Error(e),
916 };
917 if !output.status.success() {
918 let stderr = String::from_utf8_lossy(&output.stderr);
919 if stderr.contains("rate_limit")
920 || stderr.contains("429")
921 || stderr.contains("Too Many Requests")
922 {
923 return PreflightOutcome::RateLimited {
924 reason: stderr.trim().to_string(),
925 suggestion: "wait for the rate-limit window to reset",
926 };
927 }
928 return PreflightOutcome::Error(AppError::Validation(format!(
929 "preflight probe failed: {stderr}",
930 stderr = stderr.trim()
931 )));
932 }
933 PreflightOutcome::Healthy
934 }
935 EnrichMode::OpenRouter => {
936 match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
940 Some(_) => PreflightOutcome::Healthy,
941 None => PreflightOutcome::Error(AppError::Validation(
942 "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
943 )),
944 }
945 }
946 }
947}
948
949fn wait_with_timeout(
951 mut child: std::process::Child,
952 timeout: std::time::Duration,
953) -> Result<std::process::Output, AppError> {
954 use wait_timeout::ChildExt;
955 let start = std::time::Instant::now();
956 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
957 if status.is_none() {
958 let _ = child.kill();
959 let _ = child.wait();
960 return Err(AppError::Validation(format!(
961 "preflight probe timed out after {}s",
962 start.elapsed().as_secs()
963 )));
964 }
965 let mut stdout = Vec::new();
966 if let Some(mut out) = child.stdout.take() {
967 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
968 }
969 let mut stderr = Vec::new();
970 if let Some(mut err) = child.stderr.take() {
971 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
972 }
973 let exit = status.unwrap();
974 Ok(std::process::Output {
975 status: exit,
976 stdout,
977 stderr,
978 })
979}
980
981fn scan_unbound_memories(
992 conn: &Connection,
993 namespace: &str,
994 limit: Option<usize>,
995 name_filter: &[String],
996) -> Result<Vec<(i64, String, String)>, AppError> {
997 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
998
999 if name_filter.is_empty() {
1000 let sql = format!(
1001 "SELECT m.id, m.name, m.body
1002 FROM memories m
1003 WHERE m.namespace = ?1
1004 AND m.deleted_at IS NULL
1005 AND NOT EXISTS (
1006 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1007 )
1008 ORDER BY m.id
1009 {limit_clause}"
1010 );
1011 let mut stmt = conn.prepare(&sql)?;
1012 let rows = stmt
1013 .query_map(rusqlite::params![namespace], |r| {
1014 Ok((
1015 r.get::<_, i64>(0)?,
1016 r.get::<_, String>(1)?,
1017 r.get::<_, String>(2)?,
1018 ))
1019 })?
1020 .collect::<Result<Vec<_>, _>>()?;
1021 Ok(rows)
1022 } else {
1023 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1025 .map(|i| format!("?{i}"))
1026 .collect();
1027 let in_clause = placeholders.join(", ");
1028 let sql = format!(
1029 "SELECT m.id, m.name, m.body
1030 FROM memories m
1031 WHERE m.namespace = ?1
1032 AND m.deleted_at IS NULL
1033 AND m.name IN ({in_clause})
1034 AND NOT EXISTS (
1035 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1036 )
1037 ORDER BY m.id
1038 {limit_clause}"
1039 );
1040 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1041 params_vec.push(&namespace);
1042 for n in name_filter {
1043 params_vec.push(n);
1044 }
1045 let mut stmt = conn.prepare(&sql)?;
1046 let rows = stmt
1047 .query_map(
1048 rusqlite::params_from_iter(params_vec.iter().copied()),
1049 |r| {
1050 Ok((
1051 r.get::<_, i64>(0)?,
1052 r.get::<_, String>(1)?,
1053 r.get::<_, String>(2)?,
1054 ))
1055 },
1056 )?
1057 .collect::<Result<Vec<_>, _>>()?;
1058 Ok(rows)
1059 }
1060}
1061
1062fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1067 let content = std::fs::read_to_string(path).map_err(|e| {
1068 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1069 })?;
1070 let mut seen = std::collections::HashSet::new();
1071 let mut out = Vec::new();
1072 for line in content.lines() {
1073 let trimmed = line.trim();
1074 if trimmed.is_empty() || trimmed.starts_with('#') {
1075 continue;
1076 }
1077 if seen.insert(trimmed.to_string()) {
1078 out.push(trimmed.to_string());
1079 }
1080 }
1081 Ok(out)
1082}
1083
1084fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1086 let mut combined: Vec<String> = args.names.clone();
1087 if let Some(p) = &args.names_file {
1088 let from_file = read_names_file(p)?;
1089 for n in from_file {
1090 if !combined.contains(&n) {
1091 combined.push(n);
1092 }
1093 }
1094 }
1095 Ok(combined)
1096}
1097
1098fn scan_entities_without_description(
1102 conn: &Connection,
1103 namespace: &str,
1104 limit: Option<usize>,
1105 name_filter: &[String],
1106) -> Result<Vec<(i64, String, String)>, AppError> {
1107 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1108
1109 if name_filter.is_empty() {
1110 let sql = format!(
1111 "SELECT id, name, type
1112 FROM entities
1113 WHERE namespace = ?1
1114 AND (description IS NULL OR description = '')
1115 ORDER BY id
1116 {limit_clause}"
1117 );
1118 let mut stmt = conn.prepare(&sql)?;
1119 let rows = stmt
1120 .query_map(rusqlite::params![namespace], |r| {
1121 Ok((
1122 r.get::<_, i64>(0)?,
1123 r.get::<_, String>(1)?,
1124 r.get::<_, String>(2)?,
1125 ))
1126 })?
1127 .collect::<Result<Vec<_>, _>>()?;
1128 Ok(rows)
1129 } else {
1130 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1131 .map(|i| format!("?{i}"))
1132 .collect();
1133 let in_clause = placeholders.join(", ");
1134 let sql = format!(
1135 "SELECT id, name, type
1136 FROM entities
1137 WHERE namespace = ?1
1138 AND name IN ({in_clause})
1139 AND (description IS NULL OR description = '')
1140 ORDER BY id
1141 {limit_clause}"
1142 );
1143 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1144 params_vec.push(&namespace);
1145 for n in name_filter {
1146 params_vec.push(n);
1147 }
1148 let mut stmt = conn.prepare(&sql)?;
1149 let rows = stmt
1150 .query_map(
1151 rusqlite::params_from_iter(params_vec.iter().copied()),
1152 |r| {
1153 Ok((
1154 r.get::<_, i64>(0)?,
1155 r.get::<_, String>(1)?,
1156 r.get::<_, String>(2)?,
1157 ))
1158 },
1159 )?
1160 .collect::<Result<Vec<_>, _>>()?;
1161 Ok(rows)
1162 }
1163}
1164
1165fn scan_short_body_memories(
1169 conn: &Connection,
1170 namespace: &str,
1171 min_chars: usize,
1172 limit: Option<usize>,
1173 name_filter: &[String],
1174) -> Result<Vec<(i64, String, String)>, AppError> {
1175 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1176
1177 if name_filter.is_empty() {
1178 let sql = format!(
1179 "SELECT m.id, m.name, m.body
1180 FROM memories m
1181 WHERE m.namespace = ?1
1182 AND m.deleted_at IS NULL
1183 AND LENGTH(COALESCE(m.body,'')) < ?2
1184 ORDER BY m.id
1185 {limit_clause}"
1186 );
1187 let mut stmt = conn.prepare(&sql)?;
1188 let rows = stmt
1189 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1190 Ok((
1191 r.get::<_, i64>(0)?,
1192 r.get::<_, String>(1)?,
1193 r.get::<_, String>(2)?,
1194 ))
1195 })?
1196 .collect::<Result<Vec<_>, _>>()?;
1197 Ok(rows)
1198 } else {
1199 let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1200 .map(|i| format!("?{i}"))
1201 .collect();
1202 let in_clause = placeholders.join(", ");
1203 let sql = format!(
1204 "SELECT m.id, m.name, m.body
1205 FROM memories m
1206 WHERE m.namespace = ?1
1207 AND m.deleted_at IS NULL
1208 AND m.name IN ({in_clause})
1209 AND LENGTH(COALESCE(m.body,'')) < ?2
1210 ORDER BY m.id
1211 {limit_clause}"
1212 );
1213 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1214 let min_chars_i64 = min_chars as i64;
1215 params_vec.push(&namespace);
1216 params_vec.push(&min_chars_i64);
1217 for n in name_filter {
1218 params_vec.push(n);
1219 }
1220 let mut stmt = conn.prepare(&sql)?;
1221 let rows = stmt
1222 .query_map(
1223 rusqlite::params_from_iter(params_vec.iter().copied()),
1224 |r| {
1225 Ok((
1226 r.get::<_, i64>(0)?,
1227 r.get::<_, String>(1)?,
1228 r.get::<_, String>(2)?,
1229 ))
1230 },
1231 )?
1232 .collect::<Result<Vec<_>, _>>()?;
1233 Ok(rows)
1234 }
1235}
1236
1237fn scan_memories_without_embeddings(
1241 conn: &Connection,
1242 namespace: &str,
1243 limit: Option<usize>,
1244 name_filter: &[String],
1245) -> Result<Vec<(i64, String, String)>, AppError> {
1246 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1247
1248 if name_filter.is_empty() {
1249 let sql = format!(
1250 "SELECT m.id, m.name, COALESCE(m.body,'')
1251 FROM memories m
1252 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1253 WHERE m.namespace = ?1
1254 AND m.deleted_at IS NULL
1255 AND me.memory_id IS NULL
1256 ORDER BY m.id
1257 {limit_clause}"
1258 );
1259 let mut stmt = conn.prepare(&sql)?;
1260 let rows = stmt
1261 .query_map(rusqlite::params![namespace], |r| {
1262 Ok((
1263 r.get::<_, i64>(0)?,
1264 r.get::<_, String>(1)?,
1265 r.get::<_, String>(2)?,
1266 ))
1267 })?
1268 .collect::<Result<Vec<_>, _>>()?;
1269 Ok(rows)
1270 } else {
1271 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1272 .map(|i| format!("?{i}"))
1273 .collect();
1274 let in_clause = placeholders.join(", ");
1275 let sql = format!(
1276 "SELECT m.id, m.name, COALESCE(m.body,'')
1277 FROM memories m
1278 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1279 WHERE m.namespace = ?1
1280 AND m.deleted_at IS NULL
1281 AND m.name IN ({in_clause})
1282 AND me.memory_id IS NULL
1283 ORDER BY m.id
1284 {limit_clause}"
1285 );
1286 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1287 params_vec.push(&namespace);
1288 for n in name_filter {
1289 params_vec.push(n);
1290 }
1291 let mut stmt = conn.prepare(&sql)?;
1292 let rows = stmt
1293 .query_map(
1294 rusqlite::params_from_iter(params_vec.iter().copied()),
1295 |r| {
1296 Ok((
1297 r.get::<_, i64>(0)?,
1298 r.get::<_, String>(1)?,
1299 r.get::<_, String>(2)?,
1300 ))
1301 },
1302 )?
1303 .collect::<Result<Vec<_>, _>>()?;
1304 Ok(rows)
1305 }
1306}
1307
1308#[allow(clippy::type_complexity)]
1310fn scan_weight_candidates(
1311 conn: &Connection,
1312 namespace: &str,
1313 limit: Option<usize>,
1314) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1315 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1316 let sql = format!(
1317 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1318 FROM relationships r \
1319 JOIN entities e1 ON e1.id = r.source_id \
1320 JOIN entities e2 ON e2.id = r.target_id \
1321 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1322 ORDER BY r.weight DESC {limit_clause}"
1323 );
1324 let mut stmt = conn.prepare(&sql)?;
1325 let rows = stmt
1326 .query_map(rusqlite::params![namespace], |r| {
1327 Ok((
1328 r.get::<_, i64>(0)?,
1329 r.get::<_, String>(1)?,
1330 r.get::<_, String>(2)?,
1331 r.get::<_, String>(3)?,
1332 r.get::<_, f64>(4)?,
1333 ))
1334 })?
1335 .collect::<Result<Vec<_>, _>>()?;
1336 Ok(rows)
1337}
1338
1339fn scan_generic_relations(
1341 conn: &Connection,
1342 namespace: &str,
1343 limit: Option<usize>,
1344) -> Result<Vec<(i64, String, String, String)>, AppError> {
1345 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1346 let sql = format!(
1347 "SELECT r.id, e1.name, e2.name, r.relation \
1348 FROM relationships r \
1349 JOIN entities e1 ON e1.id = r.source_id \
1350 JOIN entities e2 ON e2.id = r.target_id \
1351 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1352 ORDER BY r.id {limit_clause}"
1353 );
1354 let mut stmt = conn.prepare(&sql)?;
1355 let rows = stmt
1356 .query_map(rusqlite::params![namespace], |r| {
1357 Ok((
1358 r.get::<_, i64>(0)?,
1359 r.get::<_, String>(1)?,
1360 r.get::<_, String>(2)?,
1361 r.get::<_, String>(3)?,
1362 ))
1363 })?
1364 .collect::<Result<Vec<_>, _>>()?;
1365 Ok(rows)
1366}
1367
1368fn persist_memory_bindings(
1377 conn: &Connection,
1378 namespace: &str,
1379 memory_id: i64,
1380 entities_json: &serde_json::Value,
1381 rels_json: &serde_json::Value,
1382) -> Result<(usize, usize), AppError> {
1383 #[derive(Deserialize)]
1384 struct EntityItem {
1385 name: String,
1386 entity_type: String,
1387 }
1388 #[derive(Deserialize)]
1389 struct RelItem {
1390 source: String,
1391 target: String,
1392 relation: String,
1393 strength: f64,
1394 }
1395
1396 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1397 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1398
1399 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1400 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1401
1402 let mut ent_count = 0usize;
1403 let mut rel_count = 0usize;
1404
1405 for item in &extracted_entities {
1406 let entity_type = match item.entity_type.parse::<EntityType>() {
1407 Ok(et) => et,
1408 Err(_) => {
1409 tracing::warn!(
1410 target: "enrich",
1411 entity = %item.name,
1412 entity_type = %item.entity_type,
1413 "entity type not recognized, skipping"
1414 );
1415 continue;
1416 }
1417 };
1418 match entities::upsert_entity(
1419 conn,
1420 namespace,
1421 &NewEntity {
1422 name: item.name.clone(),
1423 entity_type,
1424 description: None,
1425 },
1426 ) {
1427 Ok(eid) => {
1428 let _ = entities::link_memory_entity(conn, memory_id, eid);
1429 ent_count += 1;
1430 }
1431 Err(e) => {
1432 tracing::warn!(
1433 target: "enrich",
1434 entity = %item.name,
1435 error = %e,
1436 "entity upsert skipped"
1437 );
1438 }
1439 }
1440 }
1441
1442 for rel in &extracted_rels {
1443 let normalized = crate::parsers::normalize_relation(&rel.relation);
1444 crate::parsers::warn_if_non_canonical(&normalized);
1445
1446 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1449 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1450 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1451 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1452 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1453 let new_rel = NewRelationship {
1454 source: rel.source.clone(),
1455 target: rel.target.clone(),
1456 relation: normalized,
1457 strength: rel.strength,
1458 description: None,
1459 };
1460 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1461 rel_count += 1;
1462 }
1463 }
1464 }
1465
1466 Ok((ent_count, rel_count))
1467}
1468
1469fn persist_entity_description(
1471 conn: &Connection,
1472 entity_id: i64,
1473 description: &str,
1474) -> Result<(), AppError> {
1475 conn.execute(
1476 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1477 rusqlite::params![description, entity_id],
1478 )?;
1479 Ok(())
1480}
1481
1482#[allow(clippy::too_many_arguments)]
1488fn reembed_memory_vector(
1489 conn: &Connection,
1490 namespace: &str,
1491 memory_id: i64,
1492 memory_name: &str,
1493 memory_type: &str,
1494 body: &str,
1495 paths: &crate::paths::AppPaths,
1496 llm_backend: crate::cli::LlmBackendChoice,
1497 embedding_backend: crate::cli::EmbeddingBackendChoice,
1498) -> Result<(), AppError> {
1499 let snippet: String = body.chars().take(200).collect();
1500 let (embedding, backend_kind) = crate::embedder::embed_passage_with_embedding_choice(
1506 &paths.models,
1507 body,
1508 embedding_backend,
1509 llm_backend,
1510 )?;
1511 record_enrich_backend(backend_kind.as_str());
1512 memories::upsert_vec(
1513 conn,
1514 memory_id,
1515 namespace,
1516 memory_type,
1517 &embedding,
1518 memory_name,
1519 &snippet,
1520 )?;
1521 Ok(())
1522}
1523
1524fn record_enrich_backend(backend: &'static str) {
1530 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1531 *guard = Some(backend);
1532 }
1533}
1534
1535fn take_enrich_backend() -> Option<&'static str> {
1536 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1537}
1538
1539static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1540
1541#[allow(clippy::too_many_arguments)]
1546fn persist_enriched_body(
1547 conn: &Connection,
1548 namespace: &str,
1549 memory_id: i64,
1550 memory_name: &str,
1551 new_body: &str,
1552 paths: &crate::paths::AppPaths,
1553 llm_backend: crate::cli::LlmBackendChoice,
1554 embedding_backend: crate::cli::EmbeddingBackendChoice,
1555) -> Result<(), AppError> {
1556 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1558 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1559 rusqlite::params![memory_id],
1560 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1561 )?;
1562
1563 let memory_type: String = conn.query_row(
1564 "SELECT type FROM memories WHERE id=?1",
1565 rusqlite::params![memory_id],
1566 |r| r.get(0),
1567 )?;
1568
1569 let description: String = conn.query_row(
1570 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1571 rusqlite::params![memory_id],
1572 |r| r.get(0),
1573 )?;
1574
1575 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1576
1577 let new_memory = memories::NewMemory {
1578 namespace: namespace.to_string(),
1579 name: memory_name.to_string(),
1580 memory_type: memory_type.clone(),
1581 description: description.clone(),
1582 body: new_body.to_string(),
1583 body_hash,
1584 session_id: None,
1585 source: "agent".to_string(),
1586 metadata: serde_json::json!({
1587 "operation": "body-enrich",
1588 "orig_chars": old_body.chars().count(),
1589 "new_chars": new_body.chars().count(),
1590 }),
1591 };
1592
1593 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1597 let version_metadata = serde_json::json!({
1598 "operation": "body-enrich",
1599 "orig_chars": old_body.chars().count(),
1600 "new_chars": new_body.chars().count(),
1601 })
1602 .to_string();
1603 crate::storage::versions::insert_version(
1604 conn,
1605 memory_id,
1606 next_version,
1607 memory_name,
1608 &memory_type,
1609 &description,
1610 new_body,
1611 &version_metadata,
1612 Some("enrich"),
1613 "edit",
1614 )?;
1615
1616 memories::update(conn, memory_id, &new_memory, None)?;
1617 memories::sync_fts_after_update(
1618 conn,
1619 memory_id,
1620 &old_name,
1621 &old_desc,
1622 &old_body,
1623 &new_memory.name,
1624 &new_memory.description,
1625 &new_memory.body,
1626 )?;
1627
1628 if let Err(e) = reembed_memory_vector(
1630 conn,
1631 namespace,
1632 memory_id,
1633 memory_name,
1634 &memory_type,
1635 new_body,
1636 paths,
1637 llm_backend,
1638 embedding_backend,
1639 ) {
1640 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1641 }
1642
1643 Ok(())
1644}
1645
1646fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1658 value == default
1659}
1660
1661fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1676 const DEFAULT_TIMEOUT: u64 = 300;
1677
1678 let mut conflicts: Vec<String> = Vec::new();
1679
1680 match args.mode {
1681 EnrichMode::ClaudeCode => {
1682 if args.codex_binary.is_some() {
1683 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1684 }
1685 if args.codex_model.is_some() {
1686 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1687 }
1688 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1689 conflicts.push(format!(
1690 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1691 args.codex_timeout
1692 ));
1693 }
1694 }
1695 EnrichMode::Codex => {
1696 if args.claude_binary.is_some() {
1697 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1698 }
1699 if args.claude_model.is_some() {
1700 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1701 }
1702 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1703 conflicts.push(format!(
1704 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1705 args.claude_timeout
1706 ));
1707 }
1708 if args.max_cost_usd.is_some() {
1709 conflicts.push(
1710 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1711 .to_string(),
1712 );
1713 }
1714 }
1715 EnrichMode::Opencode => {
1716 if args.claude_binary.is_some() {
1717 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1718 }
1719 if args.claude_model.is_some() {
1720 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1721 }
1722 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1723 conflicts.push(format!(
1724 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1725 args.claude_timeout
1726 ));
1727 }
1728 if args.max_cost_usd.is_some() {
1729 conflicts.push(
1730 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1731 .to_string(),
1732 );
1733 }
1734 }
1735 EnrichMode::OpenRouter => {
1736 if args.claude_binary.is_some() {
1737 conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1738 }
1739 if args.claude_model.is_some() {
1740 conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1741 }
1742 if args.codex_binary.is_some() {
1743 conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1744 }
1745 if args.codex_model.is_some() {
1746 conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1747 }
1748 if args.opencode_binary.is_some() {
1749 conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1750 }
1751 if args.opencode_model.is_some() {
1752 conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1753 }
1754 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1755 conflicts.push(format!(
1756 "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1757 args.claude_timeout
1758 ));
1759 }
1760 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1761 conflicts.push(format!(
1762 "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1763 args.codex_timeout
1764 ));
1765 }
1766 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1767 conflicts.push(format!(
1768 "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1769 args.opencode_timeout
1770 ));
1771 }
1772 }
1773 }
1774
1775 if !conflicts.is_empty() {
1776 return Err(AppError::Validation(format!(
1777 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1778 args.mode,
1779 conflicts.join("\n - ")
1780 )));
1781 }
1782
1783 Ok(())
1784}
1785
1786pub fn run(
1790 args: &EnrichArgs,
1791 llm_backend: crate::cli::LlmBackendChoice,
1792 embedding_backend: crate::cli::EmbeddingBackendChoice,
1793) -> Result<(), AppError> {
1794 validate_mode_conditional_flags_enrich(args)?;
1797
1798 if args.mode == EnrichMode::OpenRouter {
1803 let model = args.openrouter_model.as_deref().ok_or_else(|| {
1804 AppError::Validation(
1805 "--mode openrouter requires --openrouter-model (no default model is allowed)"
1806 .into(),
1807 )
1808 })?;
1809 let resolved =
1810 crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
1811 .ok_or_else(|| {
1812 AppError::Validation(
1813 "OPENROUTER_API_KEY not found; set the env var, store it via \
1814 `config add-key --provider openrouter`, or pass --openrouter-api-key"
1815 .into(),
1816 )
1817 })?;
1818 crate::embedder::get_openrouter_chat_client(
1819 resolved.value,
1820 model,
1821 args.openrouter_timeout,
1822 )?;
1823 }
1824
1825 let started = Instant::now();
1826
1827 let paths = AppPaths::resolve(args.db.as_deref())?;
1828 ensure_db_ready(&paths)?;
1829 let conn = open_rw(&paths.db)?;
1830 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1831
1832 let wait_secs = args.wait_job_singleton;
1838 let force_flag = args.force_job_singleton;
1839 let _singleton = crate::lock::acquire_job_singleton(
1840 crate::lock::JobType::Enrich,
1841 &namespace,
1842 &paths.db,
1843 wait_secs,
1844 force_flag,
1845 )?;
1846
1847 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1849 None
1850 } else {
1851 Some(match args.mode {
1852 EnrichMode::ClaudeCode => {
1853 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1854 let version = super::claude_runner::validate_claude_version(&bin)?;
1855 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1856 emit_json(&PhaseEvent {
1857 phase: "validate",
1858 binary_path: bin.to_str(),
1859 version: Some(&version),
1860 items_total: None,
1861 items_pending: None,
1862 llm_parallelism: None,
1863 });
1864 bin
1865 }
1866 EnrichMode::Codex => {
1867 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1868 emit_json(&PhaseEvent {
1869 phase: "validate",
1870 binary_path: bin.to_str(),
1871 version: None,
1872 items_total: None,
1873 items_pending: None,
1874 llm_parallelism: None,
1875 });
1876 bin
1877 }
1878 EnrichMode::Opencode => {
1879 let bin = super::opencode_runner::find_opencode_binary_with_override(
1880 args.opencode_binary.as_deref(),
1881 )?;
1882 emit_json(&PhaseEvent {
1883 phase: "validate",
1884 binary_path: bin.to_str(),
1885 version: None,
1886 items_total: None,
1887 items_pending: None,
1888 llm_parallelism: None,
1889 });
1890 bin
1891 }
1892 EnrichMode::OpenRouter => {
1893 emit_json(&PhaseEvent {
1898 phase: "validate",
1899 binary_path: None,
1900 version: None,
1901 items_total: None,
1902 items_pending: None,
1903 llm_parallelism: None,
1904 });
1905 PathBuf::new()
1906 }
1907 })
1908 };
1909
1910 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1914 let load = crate::system_load::load_average_one();
1915 let n = crate::system_load::ncpus();
1916 return Err(AppError::Validation(format!(
1917 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1918 pass --no-max-load-check to override (not recommended)"
1919 )));
1920 }
1921
1922 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1929 {
1930 let preflight_result = run_preflight_probe(args);
1931 match preflight_result {
1932 PreflightOutcome::Healthy => {
1933 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1934 }
1935 PreflightOutcome::RateLimited { reason, suggestion } => {
1936 if let Some(fallback) = args.fallback_mode.clone() {
1937 if fallback != args.mode {
1938 return Err(AppError::Validation(format!(
1948 "preflight detected rate limit on {mode:?}: {reason}; \
1949 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1950 mode = args.mode
1951 )));
1952 }
1953 return Err(AppError::Validation(format!(
1954 "preflight detected rate limit on {mode:?}: {reason}; \
1955 --fallback-mode matches --mode, no recovery possible",
1956 mode = args.mode
1957 )));
1958 }
1959 return Err(AppError::Validation(format!(
1960 "preflight detected rate limit on {mode:?}: {reason}; \
1961 {suggestion}; pass --fallback-mode codex to recover",
1962 mode = args.mode
1963 )));
1964 }
1965 PreflightOutcome::Error(e) => {
1966 return Err(e);
1967 }
1968 }
1969 }
1970
1971 let scan_result = scan_operation(&conn, &namespace, args)?;
1973 let total = scan_result.len();
1974
1975 emit_json(&PhaseEvent {
1976 phase: "scan",
1977 binary_path: None,
1978 version: None,
1979 items_total: Some(total),
1980 items_pending: Some(total),
1981 llm_parallelism: Some(args.llm_parallelism),
1982 });
1983
1984 if args.dry_run {
1986 for (idx, key) in scan_result.iter().enumerate() {
1987 emit_json(&ItemEvent {
1988 item: key,
1989 status: "preview",
1990 memory_id: None,
1991 entity_id: None,
1992 entities: None,
1993 rels: None,
1994 chars_before: None,
1995 chars_after: None,
1996 cost_usd: None,
1997 elapsed_ms: None,
1998 error: None,
1999 index: idx,
2000 total,
2001 });
2002 }
2003 emit_json(&EnrichSummary {
2004 summary: true,
2005 operation: format!("{:?}", args.operation),
2006 items_total: total,
2007 completed: 0,
2008 failed: 0,
2009 skipped: 0,
2010 cost_usd: 0.0,
2011 elapsed_ms: started.elapsed().as_millis() as u64,
2012 backend_invoked: take_enrich_backend(),
2013 });
2014 return Ok(());
2015 }
2016
2017 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
2021
2022 if args.resume {
2023 let reset = queue_conn
2024 .execute(
2025 "UPDATE queue SET status='pending' WHERE status='processing'",
2026 [],
2027 )
2028 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
2029 if reset > 0 {
2030 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
2031 }
2032 }
2033
2034 if args.retry_failed {
2035 let count = queue_conn
2036 .execute(
2037 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
2038 [],
2039 )
2040 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
2041 tracing::info!(target: "enrich", count, "retrying failed items");
2042 }
2043
2044 if !args.resume && !args.retry_failed {
2045 queue_conn
2046 .execute("DELETE FROM queue", [])
2047 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
2048 }
2049
2050 for (idx, key) in scan_result.iter().enumerate() {
2052 let item_type = match args.operation {
2053 EnrichOperation::EntityDescriptions => "entity",
2054 _ => "memory",
2055 };
2056 if let Err(e) = queue_conn.execute(
2057 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
2058 rusqlite::params![key, item_type],
2059 ) {
2060 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
2061 }
2062 let _ = idx; }
2064
2065 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
2068 if parallelism > 1 {
2069 tracing::info!(
2070 target: "enrich",
2071 llm_parallelism = parallelism,
2072 "parallel LLM processing with bounded thread pool"
2073 );
2074 }
2075 if parallelism > 4 {
2079 match args.mode {
2080 EnrichMode::ClaudeCode => {
2081 tracing::warn!(
2082 target: "enrich",
2083 llm_parallelism = parallelism,
2084 recommended_max = 4,
2085 mode = "claude-code",
2086 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
2087 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
2088 to cut MCP children (G28-A)"
2089 );
2090 }
2091 EnrichMode::Codex if parallelism > 16 => {
2092 tracing::warn!(
2093 target: "enrich",
2094 llm_parallelism = parallelism,
2095 recommended_max = 16,
2096 mode = "codex",
2097 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
2098 consider --llm-parallelism 8 for safer concurrency"
2099 );
2100 }
2101 EnrichMode::Codex => {
2102 }
2106 EnrichMode::Opencode if parallelism > 16 => {
2107 tracing::warn!(
2108 target: "enrich",
2109 llm_parallelism = parallelism,
2110 recommended_max = 16,
2111 mode = "opencode",
2112 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
2113 consider --llm-parallelism 8 for safer concurrency"
2114 );
2115 }
2116 EnrichMode::Opencode => {
2117 }
2119 EnrichMode::OpenRouter => {
2120 }
2123 }
2124 }
2125
2126 let mut completed = 0usize;
2127 let mut failed = 0usize;
2128 let mut skipped = 0usize;
2129 let mut cost_total = 0.0f64;
2130 let mut oauth_detected = false;
2131 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2132 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2133 let enrich_started = std::time::Instant::now();
2134
2135 let provider_timeout = match args.mode {
2136 EnrichMode::ClaudeCode => args.claude_timeout,
2137 EnrichMode::Codex => args.codex_timeout,
2138 EnrichMode::Opencode => args.opencode_timeout,
2139 EnrichMode::OpenRouter => args.openrouter_timeout,
2140 };
2141
2142 let provider_model: Option<&str> = match args.mode {
2143 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
2144 EnrichMode::Codex => args.codex_model.as_deref(),
2145 EnrichMode::Opencode => args.opencode_model.as_deref(),
2146 EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
2147 };
2148
2149 if parallelism > 1 {
2153 let stdout_mu = parking_lot::Mutex::new(());
2154 let budget = args.max_cost_usd;
2155 let operation = args.operation.clone();
2156 let mode = args.mode.clone();
2157 let min_oc = args.min_output_chars;
2158 let max_oc = args.max_output_chars;
2159 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2160
2161 struct WorkerResult {
2162 completed: usize,
2163 failed: usize,
2164 skipped: usize,
2165 cost: f64,
2166 oauth: bool,
2167 }
2168
2169 let results: Vec<WorkerResult> = std::thread::scope(|s| {
2170 let handles: Vec<_> = (0..parallelism)
2171 .map(|worker_id| {
2172 let stdout_mu = &stdout_mu;
2173 let paths = &paths;
2174 let namespace = &namespace;
2175 let provider_binary = provider_binary.as_deref();
2176 let operation = &operation;
2177 let mode = &mode;
2178 let prompt_tpl = prompt_tpl.as_deref();
2179 s.spawn(move || {
2180 let w_conn = match open_rw(&paths.db) {
2181 Ok(c) => c,
2182 Err(e) => {
2183 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2184 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2185 }
2186 };
2187 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2188 Ok(c) => c,
2189 Err(e) => {
2190 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2191 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2192 }
2193 };
2194 let mut w_completed = 0usize;
2195 let mut w_failed = 0usize;
2196 let mut w_skipped = 0usize;
2197 let mut w_cost = 0.0f64;
2198 let mut w_oauth = false;
2199 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2200 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2201 let mut w_breaker = crate::retry::CircuitBreaker::new(
2207 args.circuit_breaker_threshold.max(1),
2208 std::time::Duration::from_secs(60),
2209 );
2210
2211 loop {
2212 if crate::shutdown_requested() {
2213 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2214 break;
2215 }
2216 if let Some(b) = budget {
2217 if !w_oauth && w_cost >= b {
2218 break;
2219 }
2220 }
2221 let pending: Option<(i64, String, String)> = w_queue
2222 .query_row(
2223 "UPDATE queue SET status='processing', attempt=attempt+1 \
2224 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2225 RETURNING id, item_key, item_type",
2226 [],
2227 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2228 )
2229 .ok();
2230 let (queue_id, item_key, _item_type) = match pending {
2231 Some(p) => p,
2232 None => break,
2233 };
2234 let item_started = Instant::now();
2235 let current_index = w_completed + w_failed + w_skipped;
2236
2237 let call_result = match operation {
2238 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2239 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2240 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2241 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2242 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2243 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2244 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2245 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2246 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2247 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2248 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2249 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2250 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2251 };
2252
2253 match call_result {
2254 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2255 if is_oauth { w_oauth = true; }
2256 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2257 let _ = w_queue.execute(
2258 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2259 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2260 );
2261 w_completed += 1;
2262 if !is_oauth { w_cost += cost; }
2263 let _ = w_breaker
2265 .record(crate::retry::AttemptOutcome::Success);
2266 let _guard = stdout_mu.lock();
2267 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2268 }
2269 Ok(EnrichItemResult::Skipped { reason }) => {
2270 w_skipped += 1;
2271 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2272 let _guard = stdout_mu.lock();
2273 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2274 }
2275 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2276 w_skipped += 1;
2282 let reason = format!(
2283 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2284 );
2285 let _ = w_queue.execute(
2286 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2287 rusqlite::params![reason, queue_id],
2288 );
2289 let _guard = stdout_mu.lock();
2290 emit_json(&ItemEvent {
2291 item: &item_key,
2292 status: "preservation_failed",
2293 memory_id: None,
2294 entity_id: None,
2295 entities: None,
2296 rels: None,
2297 chars_before: Some(chars_before),
2298 chars_after: Some(chars_after),
2299 cost_usd: None,
2300 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2301 error: Some(reason),
2302 index: current_index,
2303 total,
2304 });
2305 }
2306 Err(e) => {
2307 let err_str = format!("{e}");
2308 if matches!(e, AppError::RateLimited { .. }) {
2309 if crate::retry::is_kill_switch_active() {
2310 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2311 } else if std::time::Instant::now() >= w_deadline {
2312 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2313 } else {
2314 let half = w_backoff / 2;
2315 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2316 let actual_wait = half + jitter;
2317 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2318 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2319 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2320 w_backoff = (w_backoff * 2).min(900);
2321 continue;
2322 }
2323 }
2324 w_failed += 1;
2325 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2326 let _guard = stdout_mu.lock();
2327 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2328 let breaker_opened = w_breaker
2330 .record(crate::retry::AttemptOutcome::HardFailure);
2331 if breaker_opened {
2332 tracing::error!(target: "enrich",
2333 consecutive_failures = w_breaker.consecutive_failures(),
2334 "circuit breaker opened — aborting worker"
2335 );
2336 break;
2337 }
2338 }
2339 }
2340 }
2341 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2342 })
2343 })
2344 .collect();
2345 handles
2346 .into_iter()
2347 .map(|h| {
2348 h.join().unwrap_or(WorkerResult {
2349 completed: 0,
2350 failed: 0,
2351 skipped: 0,
2352 cost: 0.0,
2353 oauth: false,
2354 })
2355 })
2356 .collect()
2357 });
2358
2359 for r in &results {
2360 completed += r.completed;
2361 failed += r.failed;
2362 skipped += r.skipped;
2363 cost_total += r.cost;
2364 if r.oauth && !oauth_detected {
2365 oauth_detected = true;
2366 }
2367 }
2368 } else {
2369 loop {
2371 if crate::shutdown_requested() {
2372 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2373 break;
2374 }
2375
2376 if let Some(budget) = args.max_cost_usd {
2378 if !oauth_detected && cost_total >= budget {
2379 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2380 break;
2381 }
2382 }
2383
2384 let pending: Option<(i64, String, String)> = queue_conn
2386 .query_row(
2387 "UPDATE queue SET status='processing', attempt=attempt+1 \
2388 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2389 RETURNING id, item_key, item_type",
2390 [],
2391 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2392 )
2393 .ok();
2394
2395 let (queue_id, item_key, item_type) = match pending {
2396 Some(p) => p,
2397 None => break,
2398 };
2399
2400 let item_started = Instant::now();
2401 let current_index = completed + failed + skipped;
2402
2403 let call_result = match args.operation {
2404 EnrichOperation::MemoryBindings => call_memory_bindings(
2405 &conn,
2406 &namespace,
2407 &item_key,
2408 provider_binary
2409 .as_deref()
2410 .expect("provider binary required"),
2411 provider_model,
2412 provider_timeout,
2413 &args.mode,
2414 ),
2415 EnrichOperation::EntityDescriptions => call_entity_description(
2416 &conn,
2417 &namespace,
2418 &item_key,
2419 provider_binary
2420 .as_deref()
2421 .expect("provider binary required"),
2422 provider_model,
2423 provider_timeout,
2424 &args.mode,
2425 ),
2426 EnrichOperation::BodyEnrich => call_body_enrich(
2427 &conn,
2428 &namespace,
2429 &item_key,
2430 provider_binary
2431 .as_deref()
2432 .expect("provider binary required"),
2433 provider_model,
2434 provider_timeout,
2435 &args.mode,
2436 args.min_output_chars,
2437 args.max_output_chars,
2438 args.prompt_template.as_deref(),
2439 args.preserve_threshold,
2440 &paths,
2441 llm_backend,
2442 embedding_backend,
2443 ),
2444 EnrichOperation::ReEmbed => call_reembed(
2445 &conn,
2446 &namespace,
2447 &item_key,
2448 &paths,
2449 llm_backend,
2450 embedding_backend,
2451 ),
2452 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2453 &conn,
2454 &namespace,
2455 &item_key,
2456 provider_binary
2457 .as_deref()
2458 .expect("provider binary required"),
2459 provider_model,
2460 provider_timeout,
2461 &args.mode,
2462 ),
2463 EnrichOperation::RelationReclassify => call_relation_reclassify(
2464 &conn,
2465 &namespace,
2466 &item_key,
2467 provider_binary
2468 .as_deref()
2469 .expect("provider binary required"),
2470 provider_model,
2471 provider_timeout,
2472 &args.mode,
2473 ),
2474 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2475 call_entity_connect(
2476 &conn,
2477 &namespace,
2478 &item_key,
2479 provider_binary
2480 .as_deref()
2481 .expect("provider binary required"),
2482 provider_model,
2483 provider_timeout,
2484 &args.mode,
2485 )
2486 }
2487 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2488 &conn,
2489 &namespace,
2490 &item_key,
2491 provider_binary
2492 .as_deref()
2493 .expect("provider binary required"),
2494 provider_model,
2495 provider_timeout,
2496 &args.mode,
2497 ),
2498 EnrichOperation::DescriptionEnrich => call_description_enrich(
2499 &conn,
2500 &namespace,
2501 &item_key,
2502 provider_binary
2503 .as_deref()
2504 .expect("provider binary required"),
2505 provider_model,
2506 provider_timeout,
2507 &args.mode,
2508 ),
2509 EnrichOperation::DomainClassify => call_domain_classify(
2510 &conn,
2511 &namespace,
2512 &item_key,
2513 provider_binary
2514 .as_deref()
2515 .expect("provider binary required"),
2516 provider_model,
2517 provider_timeout,
2518 &args.mode,
2519 ),
2520 EnrichOperation::GraphAudit => call_graph_audit(
2521 &conn,
2522 &namespace,
2523 &item_key,
2524 provider_binary
2525 .as_deref()
2526 .expect("provider binary required"),
2527 provider_model,
2528 provider_timeout,
2529 &args.mode,
2530 ),
2531 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2532 &conn,
2533 &namespace,
2534 &item_key,
2535 provider_binary
2536 .as_deref()
2537 .expect("provider binary required"),
2538 provider_model,
2539 provider_timeout,
2540 &args.mode,
2541 ),
2542 EnrichOperation::BodyExtract => call_body_extract(
2543 &conn,
2544 &namespace,
2545 &item_key,
2546 provider_binary
2547 .as_deref()
2548 .expect("provider binary required"),
2549 provider_model,
2550 provider_timeout,
2551 &args.mode,
2552 ),
2553 };
2554
2555 match call_result {
2556 Ok(EnrichItemResult::Done {
2557 memory_id,
2558 entity_id,
2559 entities,
2560 rels,
2561 chars_before,
2562 chars_after,
2563 cost,
2564 is_oauth,
2565 }) => {
2566 if is_oauth && !oauth_detected {
2567 oauth_detected = true;
2568 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2569 }
2570 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2571
2572 let persist_err: Option<String> = match args.operation {
2574 EnrichOperation::MemoryBindings => {
2575 None
2577 }
2578 EnrichOperation::EntityDescriptions => {
2579 None
2581 }
2582 EnrichOperation::BodyEnrich => {
2583 None
2585 }
2586 _ => {
2587 None
2589 }
2590 };
2591
2592 if let Err(e) = queue_conn.execute(
2593 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2594 rusqlite::params![
2595 memory_id,
2596 entity_id,
2597 entities as i64,
2598 rels as i64,
2599 cost,
2600 item_started.elapsed().as_millis() as i64,
2601 queue_id
2602 ],
2603 ) {
2604 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2605 }
2606
2607 if persist_err.is_none() {
2608 completed += 1;
2609 if !is_oauth {
2610 cost_total += cost;
2611 }
2612 emit_json(&ItemEvent {
2613 item: &item_key,
2614 status: "done",
2615 memory_id,
2616 entity_id,
2617 entities: Some(entities),
2618 rels: Some(rels),
2619 chars_before,
2620 chars_after,
2621 cost_usd: if is_oauth { None } else { Some(cost) },
2622 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2623 error: None,
2624 index: current_index,
2625 total,
2626 });
2627 } else {
2628 failed += 1;
2629 emit_json(&ItemEvent {
2630 item: &item_key,
2631 status: "failed",
2632 memory_id: None,
2633 entity_id: None,
2634 entities: None,
2635 rels: None,
2636 chars_before: None,
2637 chars_after: None,
2638 cost_usd: None,
2639 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2640 error: persist_err,
2641 index: current_index,
2642 total,
2643 });
2644 }
2645 }
2646 Ok(EnrichItemResult::Skipped { reason }) => {
2647 skipped += 1;
2648 if let Err(e) = queue_conn.execute(
2649 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2650 rusqlite::params![reason, queue_id],
2651 ) {
2652 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2653 }
2654 emit_json(&ItemEvent {
2655 item: &item_key,
2656 status: "skipped",
2657 memory_id: None,
2658 entity_id: None,
2659 entities: None,
2660 rels: None,
2661 chars_before: None,
2662 chars_after: None,
2663 cost_usd: None,
2664 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2665 error: None,
2666 index: current_index,
2667 total,
2668 });
2669 }
2670 Ok(EnrichItemResult::PreservationFailed {
2671 score,
2672 threshold,
2673 chars_before,
2674 chars_after,
2675 }) => {
2676 skipped += 1;
2683 let reason = format!(
2684 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2685 );
2686 if let Err(qe) = queue_conn.execute(
2687 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2688 rusqlite::params![reason, queue_id],
2689 ) {
2690 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2691 }
2692 emit_json(&ItemEvent {
2693 item: &item_key,
2694 status: "preservation_failed",
2695 memory_id: None,
2696 entity_id: None,
2697 entities: None,
2698 rels: None,
2699 chars_before: Some(chars_before),
2700 chars_after: Some(chars_after),
2701 cost_usd: None,
2702 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2703 error: Some(reason),
2704 index: current_index,
2705 total,
2706 });
2707 }
2708 Err(e) => {
2709 let err_str = format!("{e}");
2710 if matches!(e, AppError::RateLimited { .. }) {
2711 if crate::retry::is_kill_switch_active() {
2712 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2713 } else if std::time::Instant::now() >= rate_limit_deadline {
2714 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2715 } else {
2716 let half = backoff_secs / 2;
2717 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2718 let actual_wait = half + jitter;
2719 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2720 if let Err(qe) = queue_conn.execute(
2721 "UPDATE queue SET status='pending' WHERE id=?1",
2722 rusqlite::params![queue_id],
2723 ) {
2724 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2725 }
2726 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2727 backoff_secs = (backoff_secs * 2).min(900);
2728 continue;
2729 }
2730 }
2731
2732 failed += 1;
2733 if let Err(qe) = queue_conn.execute(
2734 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2735 rusqlite::params![err_str, queue_id],
2736 ) {
2737 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2738 }
2739 emit_json(&ItemEvent {
2740 item: &item_key,
2741 status: "failed",
2742 memory_id: None,
2743 entity_id: None,
2744 entities: None,
2745 rels: None,
2746 chars_before: None,
2747 chars_after: None,
2748 cost_usd: None,
2749 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2750 error: Some(err_str),
2751 index: current_index,
2752 total,
2753 });
2754 }
2755 }
2756
2757 let _ = item_type; }
2759 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2762 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2763
2764 emit_json(&EnrichSummary {
2765 summary: true,
2766 operation: format!("{:?}", args.operation),
2767 items_total: total,
2768 completed,
2769 failed,
2770 skipped,
2771 cost_usd: cost_total,
2772 elapsed_ms: started.elapsed().as_millis() as u64,
2773 backend_invoked: take_enrich_backend(),
2774 });
2775
2776 if failed == 0 {
2777 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2778 }
2779
2780 Ok(())
2781}
2782
2783enum EnrichItemResult {
2788 Done {
2789 memory_id: Option<i64>,
2790 entity_id: Option<i64>,
2791 entities: usize,
2792 rels: usize,
2793 chars_before: Option<usize>,
2794 chars_after: Option<usize>,
2795 cost: f64,
2796 is_oauth: bool,
2797 },
2798 Skipped {
2799 reason: String,
2800 },
2801 PreservationFailed {
2806 score: f64,
2807 threshold: f64,
2808 chars_before: usize,
2809 chars_after: usize,
2810 },
2811}
2812
2813fn call_memory_bindings(
2818 conn: &Connection,
2819 namespace: &str,
2820 memory_name: &str,
2821 binary: &Path,
2822 model: Option<&str>,
2823 timeout: u64,
2824 mode: &EnrichMode,
2825) -> Result<EnrichItemResult, AppError> {
2826 let (memory_id, body): (i64, String) = conn.query_row(
2828 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2829 rusqlite::params![namespace, memory_name],
2830 |r| Ok((r.get(0)?, r.get(1)?)),
2831 ).map_err(|e| match e {
2832 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2833 other => AppError::Database(other),
2834 })?;
2835
2836 if body.trim().is_empty() {
2837 return Ok(EnrichItemResult::Skipped {
2838 reason: "body is empty".to_string(),
2839 });
2840 }
2841
2842 let (value, cost, is_oauth) = match mode {
2843 EnrichMode::ClaudeCode => call_claude(
2844 binary,
2845 BINDINGS_PROMPT,
2846 BINDINGS_SCHEMA,
2847 &body,
2848 model,
2849 timeout,
2850 )?,
2851 EnrichMode::Codex => call_codex(
2852 binary,
2853 BINDINGS_PROMPT,
2854 BINDINGS_SCHEMA,
2855 &body,
2856 model,
2857 timeout,
2858 )?,
2859 EnrichMode::Opencode => call_opencode(
2860 binary,
2861 BINDINGS_PROMPT,
2862 BINDINGS_SCHEMA,
2863 &body,
2864 model,
2865 timeout,
2866 )?,
2867 EnrichMode::OpenRouter => {
2868 call_openrouter(BINDINGS_PROMPT, BINDINGS_SCHEMA, &body, model, timeout)?
2869 }
2870 };
2871
2872 let empty_arr = serde_json::Value::Array(vec![]);
2873 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2874 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2875
2876 let (ent_count, rel_count) =
2877 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2878
2879 Ok(EnrichItemResult::Done {
2880 memory_id: Some(memory_id),
2881 entity_id: None,
2882 entities: ent_count,
2883 rels: rel_count,
2884 chars_before: None,
2885 chars_after: None,
2886 cost,
2887 is_oauth,
2888 })
2889}
2890
2891fn call_entity_description(
2892 conn: &Connection,
2893 namespace: &str,
2894 entity_name: &str,
2895 binary: &Path,
2896 model: Option<&str>,
2897 timeout: u64,
2898 mode: &EnrichMode,
2899) -> Result<EnrichItemResult, AppError> {
2900 let (entity_id, entity_type): (i64, String) = conn
2901 .query_row(
2902 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2903 rusqlite::params![namespace, entity_name],
2904 |r| Ok((r.get(0)?, r.get(1)?)),
2905 )
2906 .map_err(|e| match e {
2907 rusqlite::Error::QueryReturnedNoRows => {
2908 AppError::NotFound(format!("entity '{entity_name}' not found"))
2909 }
2910 other => AppError::Database(other),
2911 })?;
2912
2913 let prompt = format!(
2914 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2915 );
2916
2917 let (value, cost, is_oauth) = match mode {
2918 EnrichMode::ClaudeCode => call_claude(
2919 binary,
2920 &prompt,
2921 ENTITY_DESCRIPTION_SCHEMA,
2922 "",
2923 model,
2924 timeout,
2925 )?,
2926 EnrichMode::Codex => call_codex(
2927 binary,
2928 &prompt,
2929 ENTITY_DESCRIPTION_SCHEMA,
2930 "",
2931 model,
2932 timeout,
2933 )?,
2934 EnrichMode::Opencode => call_opencode(
2935 binary,
2936 &prompt,
2937 ENTITY_DESCRIPTION_SCHEMA,
2938 "",
2939 model,
2940 timeout,
2941 )?,
2942 EnrichMode::OpenRouter => {
2943 call_openrouter(&prompt, ENTITY_DESCRIPTION_SCHEMA, "", model, timeout)?
2944 }
2945 };
2946
2947 let description = value
2948 .get("description")
2949 .and_then(|v| v.as_str())
2950 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2951
2952 persist_entity_description(conn, entity_id, description)?;
2953
2954 Ok(EnrichItemResult::Done {
2955 memory_id: None,
2956 entity_id: Some(entity_id),
2957 entities: 0,
2958 rels: 0,
2959 chars_before: None,
2960 chars_after: None,
2961 cost,
2962 is_oauth,
2963 })
2964}
2965
2966#[allow(clippy::too_many_arguments)]
2967fn call_body_enrich(
2968 conn: &Connection,
2969 namespace: &str,
2970 memory_name: &str,
2971 binary: &Path,
2972 model: Option<&str>,
2973 timeout: u64,
2974 mode: &EnrichMode,
2975 min_output_chars: usize,
2976 max_output_chars: usize,
2977 prompt_template: Option<&Path>,
2978 preserve_threshold: f64,
2979 paths: &crate::paths::AppPaths,
2980 llm_backend: crate::cli::LlmBackendChoice,
2981 embedding_backend: crate::cli::EmbeddingBackendChoice,
2982) -> Result<EnrichItemResult, AppError> {
2983 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2984 .query_row(
2985 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2986 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2987 rusqlite::params![namespace, memory_name],
2988 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2989 )
2990 .map_err(|e| match e {
2991 rusqlite::Error::QueryReturnedNoRows => {
2992 AppError::NotFound(format!("memory '{memory_name}' not found"))
2993 }
2994 other => AppError::Database(other),
2995 })?;
2996
2997 let chars_before = body.chars().count();
2998
2999 let linked_entities: Vec<String> = {
3001 let mut stmt = conn.prepare_cached(
3002 "SELECT e.name FROM memory_entities me \
3003 JOIN entities e ON e.id = me.entity_id \
3004 WHERE me.memory_id = ?1 LIMIT 10",
3005 )?;
3006 let result: Vec<String> = stmt
3007 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
3008 .filter_map(|r| r.ok())
3009 .collect();
3010 drop(stmt);
3011 result
3012 };
3013
3014 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
3016 let file_size = std::fs::metadata(tmpl_path)
3017 .map_err(|e| {
3018 AppError::Io(std::io::Error::new(
3019 e.kind(),
3020 format!("failed to stat prompt template: {e}"),
3021 ))
3022 })?
3023 .len();
3024 if file_size > MAX_MEMORY_BODY_LEN as u64 {
3025 return Err(AppError::LimitExceeded(
3026 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
3027 ));
3028 }
3029 std::fs::read_to_string(tmpl_path).map_err(|e| {
3030 AppError::Io(std::io::Error::new(
3031 e.kind(),
3032 format!("failed to read prompt template: {e}"),
3033 ))
3034 })?
3035 } else {
3036 BODY_ENRICH_PROMPT_PREFIX.to_string()
3037 };
3038
3039 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
3041 let mut ctx = String::new();
3042 ctx.push_str(&format!(
3043 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
3044 ));
3045 if !description.is_empty() {
3046 ctx.push_str(&format!("- Description: {description}\n"));
3047 }
3048 ctx.push_str(&format!("- Domain: {namespace}\n"));
3049 if !linked_entities.is_empty() {
3050 ctx.push_str(&format!(
3051 "- Linked entities: {}\n",
3052 linked_entities.join(", ")
3053 ));
3054 }
3055 ctx
3056 } else {
3057 String::new()
3058 };
3059
3060 let prompt = format!(
3061 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
3062 );
3063
3064 let (value, cost, is_oauth) = match mode {
3066 EnrichMode::ClaudeCode => {
3067 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3068 }
3069 EnrichMode::Codex => {
3070 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3071 }
3072 EnrichMode::Opencode => {
3073 call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3074 }
3075 EnrichMode::OpenRouter => {
3076 call_openrouter(&prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3077 }
3078 };
3079
3080 let enriched_body = value
3081 .get("enriched_body")
3082 .and_then(|v| v.as_str())
3083 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
3084
3085 let chars_after = enriched_body.chars().count();
3086
3087 let threshold = preserve_threshold;
3094 let verdict =
3095 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
3096 if !verdict.is_accepted() {
3097 return Ok(EnrichItemResult::PreservationFailed {
3098 score: match verdict {
3099 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
3100 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
3101 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
3102 },
3103 threshold,
3104 chars_before,
3105 chars_after,
3106 });
3107 }
3108
3109 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
3115 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
3116 if old_hash == new_hash {
3117 return Ok(EnrichItemResult::Skipped {
3118 reason: format!(
3119 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
3120 ),
3121 });
3122 }
3123
3124 if chars_after <= chars_before {
3126 return Ok(EnrichItemResult::Skipped {
3127 reason: format!(
3128 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
3129 ),
3130 });
3131 }
3132
3133 persist_enriched_body(
3134 conn,
3135 namespace,
3136 memory_id,
3137 memory_name,
3138 enriched_body,
3139 paths,
3140 llm_backend,
3141 embedding_backend,
3142 )?;
3143
3144 Ok(EnrichItemResult::Done {
3145 memory_id: Some(memory_id),
3146 entity_id: None,
3147 entities: 0,
3148 rels: 0,
3149 chars_before: Some(chars_before),
3150 chars_after: Some(chars_after),
3151 cost,
3152 is_oauth,
3153 })
3154}
3155
3156fn call_reembed(
3157 conn: &Connection,
3158 namespace: &str,
3159 memory_name: &str,
3160 paths: &crate::paths::AppPaths,
3161 llm_backend: crate::cli::LlmBackendChoice,
3162 embedding_backend: crate::cli::EmbeddingBackendChoice,
3163) -> Result<EnrichItemResult, AppError> {
3164 let (memory_id, body, memory_type): (i64, String, String) = conn
3165 .query_row(
3166 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
3167 FROM memories
3168 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3169 rusqlite::params![namespace, memory_name],
3170 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3171 )
3172 .map_err(|e| match e {
3173 rusqlite::Error::QueryReturnedNoRows => {
3174 AppError::NotFound(format!("memory '{memory_name}' not found"))
3175 }
3176 other => AppError::Database(other),
3177 })?;
3178
3179 if body.trim().is_empty() {
3180 return Ok(EnrichItemResult::Skipped {
3181 reason: "body is empty".to_string(),
3182 });
3183 }
3184
3185 reembed_memory_vector(
3186 conn,
3187 namespace,
3188 memory_id,
3189 memory_name,
3190 &memory_type,
3191 &body,
3192 paths,
3193 llm_backend,
3194 embedding_backend,
3195 )?;
3196
3197 Ok(EnrichItemResult::Done {
3198 memory_id: Some(memory_id),
3199 entity_id: None,
3200 entities: 0,
3201 rels: 0,
3202 chars_before: Some(body.chars().count()),
3203 chars_after: Some(body.chars().count()),
3204 cost: 0.0,
3205 is_oauth: true,
3206 })
3207}
3208
3209fn scan_operation(
3214 conn: &Connection,
3215 namespace: &str,
3216 args: &EnrichArgs,
3217) -> Result<Vec<String>, AppError> {
3218 let name_filter = resolve_name_filter(args)?;
3220 match args.operation {
3221 EnrichOperation::MemoryBindings => {
3222 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3223 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3224 }
3225 EnrichOperation::EntityDescriptions => {
3226 let rows =
3227 scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3228 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3229 }
3230 EnrichOperation::BodyEnrich => {
3231 let rows = scan_short_body_memories(
3232 conn,
3233 namespace,
3234 args.min_output_chars,
3235 args.limit,
3236 &name_filter,
3237 )?;
3238 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3239 }
3240 EnrichOperation::ReEmbed => {
3241 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3242 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3243 }
3244 EnrichOperation::WeightCalibrate => {
3245 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3246 Ok(rows
3247 .into_iter()
3248 .map(|(id, _, _, _, _)| id.to_string())
3249 .collect())
3250 }
3251 EnrichOperation::RelationReclassify => {
3252 let rows = scan_generic_relations(conn, namespace, args.limit)?;
3253 Ok(rows
3254 .into_iter()
3255 .map(|(id, _, _, _)| id.to_string())
3256 .collect())
3257 }
3258 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3259 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3260 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3261 }
3262 EnrichOperation::EntityTypeValidate => {
3263 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3264 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3265 }
3266 EnrichOperation::DescriptionEnrich => {
3267 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3268 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3269 }
3270 EnrichOperation::DomainClassify
3271 | EnrichOperation::GraphAudit
3272 | EnrichOperation::DeepResearchSynth
3273 | EnrichOperation::BodyExtract => {
3274 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3275 let sql = format!(
3276 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3277 );
3278 let mut stmt = conn.prepare(&sql)?;
3279 let names = stmt
3280 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3281 .collect::<Result<Vec<_>, _>>()?;
3282 Ok(names)
3283 }
3284 }
3285}
3286
3287fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3293 if let Some(p) = explicit {
3294 if p.exists() {
3295 return Ok(p.to_path_buf());
3296 }
3297 return Err(AppError::Validation(format!(
3298 "Codex binary not found at explicit path: {}",
3299 p.display()
3300 )));
3301 }
3302
3303 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3304 let p = PathBuf::from(&env_path);
3305 if p.exists() {
3306 return Ok(p);
3307 }
3308 }
3309
3310 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3311 if let Some(path_var) = std::env::var_os("PATH") {
3312 for dir in std::env::split_paths(&path_var) {
3313 let candidate = dir.join(name);
3314 if candidate.exists() {
3315 return Ok(crate::extract::llm_embedding::resolve_real_binary(
3316 &candidate,
3317 ));
3318 }
3319 }
3320 }
3321
3322 Err(AppError::Validation(
3323 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3324 ))
3325}
3326
3327fn call_weight_calibrate(
3329 conn: &Connection,
3330 _namespace: &str,
3331 item_key: &str,
3332 binary: &Path,
3333 model: Option<&str>,
3334 timeout: u64,
3335 mode: &EnrichMode,
3336) -> Result<EnrichItemResult, AppError> {
3337 let rel_id: i64 = item_key
3338 .parse()
3339 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3340 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3341 .query_row(
3342 "SELECT e1.name, e2.name, r.relation, r.weight \
3343 FROM relationships r \
3344 JOIN entities e1 ON e1.id = r.source_id \
3345 JOIN entities e2 ON e2.id = r.target_id \
3346 WHERE r.id = ?1",
3347 rusqlite::params![rel_id],
3348 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3349 )
3350 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3351
3352 let input_text = format!(
3353 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3354 );
3355 let (value, cost, is_oauth) = match mode {
3356 EnrichMode::ClaudeCode => call_claude(
3357 binary,
3358 WEIGHT_CALIBRATE_PROMPT,
3359 WEIGHT_CALIBRATE_SCHEMA,
3360 &input_text,
3361 model,
3362 timeout,
3363 )?,
3364 EnrichMode::Codex => call_codex(
3365 binary,
3366 WEIGHT_CALIBRATE_PROMPT,
3367 WEIGHT_CALIBRATE_SCHEMA,
3368 &input_text,
3369 model,
3370 timeout,
3371 )?,
3372 EnrichMode::Opencode => call_opencode(
3373 binary,
3374 WEIGHT_CALIBRATE_PROMPT,
3375 WEIGHT_CALIBRATE_SCHEMA,
3376 &input_text,
3377 model,
3378 timeout,
3379 )?,
3380 EnrichMode::OpenRouter => call_openrouter(
3381 WEIGHT_CALIBRATE_PROMPT,
3382 WEIGHT_CALIBRATE_SCHEMA,
3383 &input_text,
3384 model,
3385 timeout,
3386 )?,
3387 };
3388
3389 let calibrated = value
3390 .get("calibrated_weight")
3391 .and_then(|v| v.as_f64())
3392 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3393
3394 conn.execute(
3395 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3396 rusqlite::params![calibrated, rel_id],
3397 )?;
3398
3399 Ok(EnrichItemResult::Done {
3400 memory_id: None,
3401 entity_id: None,
3402 entities: 0,
3403 rels: 1,
3404 chars_before: None,
3405 chars_after: None,
3406 cost,
3407 is_oauth,
3408 })
3409}
3410
3411fn call_relation_reclassify(
3413 conn: &Connection,
3414 _namespace: &str,
3415 item_key: &str,
3416 binary: &Path,
3417 model: Option<&str>,
3418 timeout: u64,
3419 mode: &EnrichMode,
3420) -> Result<EnrichItemResult, AppError> {
3421 let rel_id: i64 = item_key
3422 .parse()
3423 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3424 let (source_name, target_name, current_relation): (String, String, String) = conn
3425 .query_row(
3426 "SELECT e1.name, e2.name, r.relation \
3427 FROM relationships r \
3428 JOIN entities e1 ON e1.id = r.source_id \
3429 JOIN entities e2 ON e2.id = r.target_id \
3430 WHERE r.id = ?1",
3431 rusqlite::params![rel_id],
3432 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3433 )
3434 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3435
3436 let input_text = format!(
3437 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3438 );
3439 let (value, cost, is_oauth) = match mode {
3440 EnrichMode::ClaudeCode => call_claude(
3441 binary,
3442 RELATION_RECLASSIFY_PROMPT,
3443 RELATION_RECLASSIFY_SCHEMA,
3444 &input_text,
3445 model,
3446 timeout,
3447 )?,
3448 EnrichMode::Codex => call_codex(
3449 binary,
3450 RELATION_RECLASSIFY_PROMPT,
3451 RELATION_RECLASSIFY_SCHEMA,
3452 &input_text,
3453 model,
3454 timeout,
3455 )?,
3456 EnrichMode::Opencode => call_opencode(
3457 binary,
3458 RELATION_RECLASSIFY_PROMPT,
3459 RELATION_RECLASSIFY_SCHEMA,
3460 &input_text,
3461 model,
3462 timeout,
3463 )?,
3464 EnrichMode::OpenRouter => call_openrouter(
3465 RELATION_RECLASSIFY_PROMPT,
3466 RELATION_RECLASSIFY_SCHEMA,
3467 &input_text,
3468 model,
3469 timeout,
3470 )?,
3471 };
3472
3473 let new_relation = value
3474 .get("relation")
3475 .and_then(|v| v.as_str())
3476 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3477 let new_strength = value
3478 .get("strength")
3479 .and_then(|v| v.as_f64())
3480 .unwrap_or(0.5);
3481
3482 conn.execute(
3483 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3484 rusqlite::params![new_relation, new_strength, rel_id],
3485 )?;
3486
3487 Ok(EnrichItemResult::Done {
3488 memory_id: None,
3489 entity_id: None,
3490 entities: 0,
3491 rels: 1,
3492 chars_before: None,
3493 chars_after: None,
3494 cost,
3495 is_oauth,
3496 })
3497}
3498
3499fn call_entity_connect(
3501 conn: &Connection,
3502 namespace: &str,
3503 item_key: &str,
3504 binary: &Path,
3505 model: Option<&str>,
3506 timeout: u64,
3507 mode: &EnrichMode,
3508) -> Result<EnrichItemResult, AppError> {
3509 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3510 let (e1_id, e1_name, e2_id, e2_name) =
3511 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3512 Some(p) => p,
3513 None => {
3514 return Ok(EnrichItemResult::Skipped {
3515 reason: "pair no longer isolated".into(),
3516 })
3517 }
3518 };
3519 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3520 let (value, cost, is_oauth) = match mode {
3521 EnrichMode::ClaudeCode => call_claude(
3522 binary,
3523 ENTITY_CONNECT_PROMPT,
3524 ENTITY_CONNECT_SCHEMA,
3525 &input_text,
3526 model,
3527 timeout,
3528 )?,
3529 EnrichMode::Codex => call_codex(
3530 binary,
3531 ENTITY_CONNECT_PROMPT,
3532 ENTITY_CONNECT_SCHEMA,
3533 &input_text,
3534 model,
3535 timeout,
3536 )?,
3537 EnrichMode::Opencode => call_opencode(
3538 binary,
3539 ENTITY_CONNECT_PROMPT,
3540 ENTITY_CONNECT_SCHEMA,
3541 &input_text,
3542 model,
3543 timeout,
3544 )?,
3545 EnrichMode::OpenRouter => call_openrouter(
3546 ENTITY_CONNECT_PROMPT,
3547 ENTITY_CONNECT_SCHEMA,
3548 &input_text,
3549 model,
3550 timeout,
3551 )?,
3552 };
3553 let relation = value
3554 .get("relation")
3555 .and_then(|v| v.as_str())
3556 .unwrap_or("none");
3557 if relation == "none" {
3558 return Ok(EnrichItemResult::Skipped {
3559 reason: "LLM determined no relationship".into(),
3560 });
3561 }
3562 let strength = value
3563 .get("strength")
3564 .and_then(|v| v.as_f64())
3565 .unwrap_or(0.5);
3566 conn.execute(
3567 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3568 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3569 )?;
3570 Ok(EnrichItemResult::Done {
3571 memory_id: None,
3572 entity_id: None,
3573 entities: 0,
3574 rels: 1,
3575 chars_before: None,
3576 chars_after: None,
3577 cost,
3578 is_oauth,
3579 })
3580}
3581
3582fn call_entity_type_validate(
3584 conn: &Connection,
3585 _namespace: &str,
3586 item_key: &str,
3587 binary: &Path,
3588 model: Option<&str>,
3589 timeout: u64,
3590 mode: &EnrichMode,
3591) -> Result<EnrichItemResult, AppError> {
3592 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3593 .query_row(
3594 "SELECT id, name, type FROM entities WHERE name = ?1",
3595 rusqlite::params![item_key],
3596 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3597 )
3598 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3599 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3600 let (value, cost, is_oauth) = match mode {
3601 EnrichMode::ClaudeCode => call_claude(
3602 binary,
3603 ENTITY_TYPE_VALIDATE_PROMPT,
3604 ENTITY_TYPE_VALIDATE_SCHEMA,
3605 &input_text,
3606 model,
3607 timeout,
3608 )?,
3609 EnrichMode::Codex => call_codex(
3610 binary,
3611 ENTITY_TYPE_VALIDATE_PROMPT,
3612 ENTITY_TYPE_VALIDATE_SCHEMA,
3613 &input_text,
3614 model,
3615 timeout,
3616 )?,
3617 EnrichMode::Opencode => call_opencode(
3618 binary,
3619 ENTITY_TYPE_VALIDATE_PROMPT,
3620 ENTITY_TYPE_VALIDATE_SCHEMA,
3621 &input_text,
3622 model,
3623 timeout,
3624 )?,
3625 EnrichMode::OpenRouter => call_openrouter(
3626 ENTITY_TYPE_VALIDATE_PROMPT,
3627 ENTITY_TYPE_VALIDATE_SCHEMA,
3628 &input_text,
3629 model,
3630 timeout,
3631 )?,
3632 };
3633 let validated_type = value
3634 .get("validated_type")
3635 .and_then(|v| v.as_str())
3636 .unwrap_or(&ent_type);
3637 let was_correct = value
3638 .get("was_correct")
3639 .and_then(|v| v.as_bool())
3640 .unwrap_or(true);
3641 if !was_correct {
3642 conn.execute(
3643 "UPDATE entities SET type = ?1 WHERE id = ?2",
3644 rusqlite::params![validated_type, ent_id],
3645 )?;
3646 }
3647 Ok(EnrichItemResult::Done {
3648 memory_id: None,
3649 entity_id: Some(ent_id),
3650 entities: 1,
3651 rels: 0,
3652 chars_before: None,
3653 chars_after: None,
3654 cost,
3655 is_oauth,
3656 })
3657}
3658
3659fn call_description_enrich(
3661 conn: &Connection,
3662 _namespace: &str,
3663 item_key: &str,
3664 binary: &Path,
3665 model: Option<&str>,
3666 timeout: u64,
3667 mode: &EnrichMode,
3668) -> Result<EnrichItemResult, AppError> {
3669 let (mem_id, body, old_desc): (i64, String, String) = conn
3670 .query_row(
3671 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3672 rusqlite::params![item_key],
3673 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3674 )
3675 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3676 let snippet: String = body.chars().take(500).collect();
3677 let input_text = format!(
3678 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3679 );
3680 let (value, cost, is_oauth) = match mode {
3681 EnrichMode::ClaudeCode => call_claude(
3682 binary,
3683 DESCRIPTION_ENRICH_PROMPT,
3684 DESCRIPTION_ENRICH_SCHEMA,
3685 &input_text,
3686 model,
3687 timeout,
3688 )?,
3689 EnrichMode::Codex => call_codex(
3690 binary,
3691 DESCRIPTION_ENRICH_PROMPT,
3692 DESCRIPTION_ENRICH_SCHEMA,
3693 &input_text,
3694 model,
3695 timeout,
3696 )?,
3697 EnrichMode::Opencode => call_opencode(
3698 binary,
3699 DESCRIPTION_ENRICH_PROMPT,
3700 DESCRIPTION_ENRICH_SCHEMA,
3701 &input_text,
3702 model,
3703 timeout,
3704 )?,
3705 EnrichMode::OpenRouter => call_openrouter(
3706 DESCRIPTION_ENRICH_PROMPT,
3707 DESCRIPTION_ENRICH_SCHEMA,
3708 &input_text,
3709 model,
3710 timeout,
3711 )?,
3712 };
3713 let new_desc = value
3714 .get("description")
3715 .and_then(|v| v.as_str())
3716 .unwrap_or(&old_desc);
3717 let old_name: String = conn.query_row(
3718 "SELECT name FROM memories WHERE id = ?1",
3719 rusqlite::params![mem_id],
3720 |r| r.get(0),
3721 )?;
3722 conn.execute(
3723 "UPDATE memories SET description = ?1 WHERE id = ?2",
3724 rusqlite::params![new_desc, mem_id],
3725 )?;
3726 memories::sync_fts_after_update(
3727 conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3728 )?;
3729 Ok(EnrichItemResult::Done {
3730 memory_id: Some(mem_id),
3731 entity_id: None,
3732 entities: 0,
3733 rels: 0,
3734 chars_before: Some(old_desc.len()),
3735 chars_after: Some(new_desc.len()),
3736 cost,
3737 is_oauth,
3738 })
3739}
3740
3741fn call_domain_classify(
3743 conn: &Connection,
3744 _namespace: &str,
3745 item_key: &str,
3746 binary: &Path,
3747 model: Option<&str>,
3748 timeout: u64,
3749 mode: &EnrichMode,
3750) -> Result<EnrichItemResult, AppError> {
3751 let (mem_id, body, desc): (i64, String, String) = conn
3752 .query_row(
3753 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3754 rusqlite::params![item_key],
3755 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3756 )
3757 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3758 let snippet: String = body.chars().take(500).collect();
3759 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3760 let (value, cost, is_oauth) = match mode {
3761 EnrichMode::ClaudeCode => call_claude(
3762 binary,
3763 DOMAIN_CLASSIFY_PROMPT,
3764 DOMAIN_CLASSIFY_SCHEMA,
3765 &input_text,
3766 model,
3767 timeout,
3768 )?,
3769 EnrichMode::Codex => call_codex(
3770 binary,
3771 DOMAIN_CLASSIFY_PROMPT,
3772 DOMAIN_CLASSIFY_SCHEMA,
3773 &input_text,
3774 model,
3775 timeout,
3776 )?,
3777 EnrichMode::Opencode => call_opencode(
3778 binary,
3779 DOMAIN_CLASSIFY_PROMPT,
3780 DOMAIN_CLASSIFY_SCHEMA,
3781 &input_text,
3782 model,
3783 timeout,
3784 )?,
3785 EnrichMode::OpenRouter => call_openrouter(
3786 DOMAIN_CLASSIFY_PROMPT,
3787 DOMAIN_CLASSIFY_SCHEMA,
3788 &input_text,
3789 model,
3790 timeout,
3791 )?,
3792 };
3793 let domain = value
3794 .get("domain")
3795 .and_then(|v| v.as_str())
3796 .unwrap_or("uncategorized");
3797 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3798 conn.execute(
3799 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3800 rusqlite::params![metadata, mem_id],
3801 )?;
3802 Ok(EnrichItemResult::Done {
3803 memory_id: Some(mem_id),
3804 entity_id: None,
3805 entities: 0,
3806 rels: 0,
3807 chars_before: None,
3808 chars_after: None,
3809 cost,
3810 is_oauth,
3811 })
3812}
3813
3814fn call_graph_audit(
3816 conn: &Connection,
3817 _namespace: &str,
3818 item_key: &str,
3819 binary: &Path,
3820 model: Option<&str>,
3821 timeout: u64,
3822 mode: &EnrichMode,
3823) -> Result<EnrichItemResult, AppError> {
3824 let (mem_id, body, desc): (i64, String, String) = conn
3825 .query_row(
3826 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3827 rusqlite::params![item_key],
3828 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3829 )
3830 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3831 let snippet: String = body.chars().take(500).collect();
3832 let ent_count: i64 = conn
3833 .query_row(
3834 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3835 rusqlite::params![mem_id],
3836 |r| r.get(0),
3837 )
3838 .unwrap_or(0);
3839 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3840 let (value, cost, is_oauth) = match mode {
3841 EnrichMode::ClaudeCode => call_claude(
3842 binary,
3843 GRAPH_AUDIT_PROMPT,
3844 GRAPH_AUDIT_SCHEMA,
3845 &input_text,
3846 model,
3847 timeout,
3848 )?,
3849 EnrichMode::Codex => call_codex(
3850 binary,
3851 GRAPH_AUDIT_PROMPT,
3852 GRAPH_AUDIT_SCHEMA,
3853 &input_text,
3854 model,
3855 timeout,
3856 )?,
3857 EnrichMode::Opencode => call_opencode(
3858 binary,
3859 GRAPH_AUDIT_PROMPT,
3860 GRAPH_AUDIT_SCHEMA,
3861 &input_text,
3862 model,
3863 timeout,
3864 )?,
3865 EnrichMode::OpenRouter => call_openrouter(
3866 GRAPH_AUDIT_PROMPT,
3867 GRAPH_AUDIT_SCHEMA,
3868 &input_text,
3869 model,
3870 timeout,
3871 )?,
3872 };
3873 let issues = value
3874 .get("issues")
3875 .and_then(|v| v.as_array())
3876 .map(|a| a.len())
3877 .unwrap_or(0);
3878 Ok(EnrichItemResult::Done {
3879 memory_id: Some(mem_id),
3880 entity_id: None,
3881 entities: 0,
3882 rels: issues,
3883 chars_before: None,
3884 chars_after: None,
3885 cost,
3886 is_oauth,
3887 })
3888}
3889
3890fn call_deep_research_synth(
3892 conn: &Connection,
3893 namespace: &str,
3894 item_key: &str,
3895 binary: &Path,
3896 model: Option<&str>,
3897 timeout: u64,
3898 mode: &EnrichMode,
3899) -> Result<EnrichItemResult, AppError> {
3900 let (mem_id, body): (i64, String) = conn
3901 .query_row(
3902 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3903 rusqlite::params![item_key],
3904 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3905 )
3906 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3907 let snippet: String = body.chars().take(2000).collect();
3908 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3909 let (value, cost, is_oauth) = match mode {
3910 EnrichMode::ClaudeCode => call_claude(
3911 binary,
3912 DEEP_RESEARCH_SYNTH_PROMPT,
3913 DEEP_RESEARCH_SYNTH_SCHEMA,
3914 &input_text,
3915 model,
3916 timeout,
3917 )?,
3918 EnrichMode::Codex => call_codex(
3919 binary,
3920 DEEP_RESEARCH_SYNTH_PROMPT,
3921 DEEP_RESEARCH_SYNTH_SCHEMA,
3922 &input_text,
3923 model,
3924 timeout,
3925 )?,
3926 EnrichMode::Opencode => call_opencode(
3927 binary,
3928 DEEP_RESEARCH_SYNTH_PROMPT,
3929 DEEP_RESEARCH_SYNTH_SCHEMA,
3930 &input_text,
3931 model,
3932 timeout,
3933 )?,
3934 EnrichMode::OpenRouter => call_openrouter(
3935 DEEP_RESEARCH_SYNTH_PROMPT,
3936 DEEP_RESEARCH_SYNTH_SCHEMA,
3937 &input_text,
3938 model,
3939 timeout,
3940 )?,
3941 };
3942 let mut ent_count = 0usize;
3943 let mut rel_count = 0usize;
3944 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3945 for e in ents {
3946 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3947 let etype_str = e
3948 .get("entity_type")
3949 .and_then(|v| v.as_str())
3950 .unwrap_or("concept");
3951 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3952 if name.len() >= 2 {
3953 let ne = NewEntity {
3954 name: name.to_string(),
3955 entity_type: etype,
3956 description: None,
3957 };
3958 let _ = entities::upsert_entity(conn, namespace, &ne);
3959 ent_count += 1;
3960 }
3961 }
3962 }
3963 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3964 for r in rels {
3965 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3966 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3967 if src.is_empty() || tgt.is_empty() {
3968 continue;
3969 }
3970 let rel = r
3971 .get("relation")
3972 .and_then(|v| v.as_str())
3973 .unwrap_or("related");
3974 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3975 if let (Some(sid), Some(tid)) = (
3976 entities::find_entity_id(conn, namespace, src)?,
3977 entities::find_entity_id(conn, namespace, tgt)?,
3978 ) {
3979 let _ = entities::create_or_fetch_relationship(
3980 conn, namespace, sid, tid, rel, str_, None,
3981 );
3982 rel_count += 1;
3983 }
3984 }
3985 }
3986 Ok(EnrichItemResult::Done {
3987 memory_id: Some(mem_id),
3988 entity_id: None,
3989 entities: ent_count,
3990 rels: rel_count,
3991 chars_before: None,
3992 chars_after: None,
3993 cost,
3994 is_oauth,
3995 })
3996}
3997
3998fn call_body_extract(
4000 conn: &Connection,
4001 _namespace: &str,
4002 item_key: &str,
4003 binary: &Path,
4004 model: Option<&str>,
4005 timeout: u64,
4006 mode: &EnrichMode,
4007) -> Result<EnrichItemResult, AppError> {
4008 let (mem_id, body, old_desc): (i64, String, String) = conn
4009 .query_row(
4010 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4011 rusqlite::params![item_key],
4012 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4013 )
4014 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4015 let old_name: String = conn.query_row(
4016 "SELECT name FROM memories WHERE id = ?1",
4017 rusqlite::params![mem_id],
4018 |r| r.get(0),
4019 )?;
4020 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
4021 let (value, cost, is_oauth) = match mode {
4022 EnrichMode::ClaudeCode => call_claude(
4023 binary,
4024 BODY_EXTRACT_PROMPT,
4025 BODY_EXTRACT_SCHEMA,
4026 &input_text,
4027 model,
4028 timeout,
4029 )?,
4030 EnrichMode::Codex => call_codex(
4031 binary,
4032 BODY_EXTRACT_PROMPT,
4033 BODY_EXTRACT_SCHEMA,
4034 &input_text,
4035 model,
4036 timeout,
4037 )?,
4038 EnrichMode::Opencode => call_opencode(
4039 binary,
4040 BODY_EXTRACT_PROMPT,
4041 BODY_EXTRACT_SCHEMA,
4042 &input_text,
4043 model,
4044 timeout,
4045 )?,
4046 EnrichMode::OpenRouter => call_openrouter(
4047 BODY_EXTRACT_PROMPT,
4048 BODY_EXTRACT_SCHEMA,
4049 &input_text,
4050 model,
4051 timeout,
4052 )?,
4053 };
4054 let restructured = value
4055 .get("restructured_body")
4056 .and_then(|v| v.as_str())
4057 .unwrap_or(&body);
4058 let chars_before = body.len();
4059 let chars_after = restructured.len();
4060 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
4061 conn.execute(
4062 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
4063 rusqlite::params![restructured, new_hash, mem_id],
4064 )?;
4065 memories::sync_fts_after_update(
4066 conn,
4067 mem_id,
4068 &old_name,
4069 &old_desc,
4070 &body,
4071 &old_name,
4072 &old_desc,
4073 restructured,
4074 )?;
4075 Ok(EnrichItemResult::Done {
4076 memory_id: Some(mem_id),
4077 entity_id: None,
4078 entities: 0,
4079 rels: 0,
4080 chars_before: Some(chars_before),
4081 chars_after: Some(chars_after),
4082 cost,
4083 is_oauth,
4084 })
4085}
4086
4087#[allow(clippy::type_complexity)]
4089fn scan_isolated_entity_pairs(
4090 conn: &Connection,
4091 namespace: &str,
4092 limit: Option<usize>,
4093) -> Result<Vec<(i64, String, i64, String)>, AppError> {
4094 let limit_val = limit.unwrap_or(50) as i64;
4095 let mut stmt = conn.prepare_cached(
4096 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
4097 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
4098 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
4099 (r.source_id = e1.id AND r.target_id = e2.id) OR \
4100 (r.source_id = e2.id AND r.target_id = e1.id)) \
4101 LIMIT ?2",
4102 )?;
4103 let rows = stmt
4104 .query_map(rusqlite::params![namespace, limit_val], |r| {
4105 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
4106 })?
4107 .collect::<Result<Vec<_>, _>>()?;
4108 Ok(rows)
4109}
4110
4111fn scan_entities_for_type_validation(
4113 conn: &Connection,
4114 namespace: &str,
4115 limit: Option<usize>,
4116) -> Result<Vec<(i64, String, String)>, AppError> {
4117 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4118 let sql = format!(
4119 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
4120 );
4121 let mut stmt = conn.prepare(&sql)?;
4122 let rows = stmt
4123 .query_map(rusqlite::params![namespace], |r| {
4124 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4125 })?
4126 .collect::<Result<Vec<_>, _>>()?;
4127 Ok(rows)
4128}
4129
4130fn scan_generic_descriptions(
4132 conn: &Connection,
4133 namespace: &str,
4134 limit: Option<usize>,
4135) -> Result<Vec<(i64, String, String)>, AppError> {
4136 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4137 let sql = format!(
4138 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
4139 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
4140 ORDER BY id {limit_clause}"
4141 );
4142 let mut stmt = conn.prepare(&sql)?;
4143 let rows = stmt
4144 .query_map(rusqlite::params![namespace], |r| {
4145 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4146 })?
4147 .collect::<Result<Vec<_>, _>>()?;
4148 Ok(rows)
4149}
4150
4151fn call_codex(
4155 binary: &Path,
4156 prompt: &str,
4157 json_schema: &str,
4158 input_text: &str,
4159 model: Option<&str>,
4160 timeout_secs: u64,
4161) -> Result<(serde_json::Value, f64, bool), AppError> {
4162 use wait_timeout::ChildExt;
4163
4164 super::codex_spawn::validate_codex_model(model)?;
4169 let schema_file = super::codex_spawn::trusted_schema_path()?;
4170
4171 let args = super::codex_spawn::CodexSpawnArgs {
4172 binary,
4173 prompt,
4174 json_schema,
4175 input_text,
4176 model,
4177 timeout_secs,
4178 schema_path: schema_file.clone(),
4179 };
4180 let mut cmd = super::codex_spawn::build_codex_command(&args)?;
4181
4182 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
4183 AppError::Io(std::io::Error::new(
4184 e.kind(),
4185 format!("failed to spawn codex: {e}"),
4186 ))
4187 })?;
4188
4189 let full_prompt = format!("{prompt}\n\n{input_text}");
4190 let stdin_bytes = full_prompt.into_bytes();
4191 let mut child_stdin = child
4192 .stdin
4193 .take()
4194 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
4195 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
4196 child_stdin.write_all(&stdin_bytes)?;
4197 drop(child_stdin);
4198 Ok(())
4199 });
4200
4201 let start = std::time::Instant::now();
4202 let timeout = std::time::Duration::from_secs(timeout_secs);
4203 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4204 let _ = std::fs::remove_file(&schema_file);
4205
4206 match status {
4207 Some(exit_status) => {
4208 stdin_thread
4209 .join()
4210 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
4211 .map_err(AppError::Io)?;
4212
4213 tracing::debug!(
4214 target: "process",
4215 exit_code = ?exit_status.code(),
4216 elapsed_ms = start.elapsed().as_millis() as u64,
4217 "external process completed"
4218 );
4219
4220 let mut stdout_buf = Vec::new();
4221 if let Some(mut out) = child.stdout.take() {
4222 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4223 }
4224 if !exit_status.success() {
4225 let mut stderr_buf = Vec::new();
4226 if let Some(mut err) = child.stderr.take() {
4227 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4228 }
4229 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4230 tracing::warn!(
4231 target: "enrich",
4232 exit_code = ?exit_status.code(),
4233 stderr = %stderr_str.trim(),
4234 "codex process failed"
4235 );
4236 return Err(AppError::Validation(format!(
4237 "codex exited with code {:?}: {}",
4238 exit_status.code(),
4239 stderr_str.trim()
4240 )));
4241 }
4242 let stdout_str = String::from_utf8(stdout_buf)
4243 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4244 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4247 let value: serde_json::Value =
4253 serde_json::from_str(&result.last_agent_text).map_err(|e| {
4254 AppError::Validation(format!(
4255 "codex agent_message is not valid JSON: {e}; raw={}",
4256 result.last_agent_text
4257 ))
4258 })?;
4259 Ok((value, 0.0, false))
4260 }
4261 None => {
4262 let _ = child.kill();
4263 let _ = child.wait();
4264 let _ = stdin_thread.join();
4265 Err(AppError::Validation(format!(
4266 "codex timed out after {timeout_secs} seconds"
4267 )))
4268 }
4269 }
4270}
4271
4272fn call_opencode(
4273 binary: &Path,
4274 prompt: &str,
4275 json_schema: &str,
4276 input_text: &str,
4277 model: Option<&str>,
4278 timeout_secs: u64,
4279) -> Result<(serde_json::Value, f64, bool), AppError> {
4280 use wait_timeout::ChildExt;
4281
4282 let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4283
4284 let augmented_prompt = if json_schema.is_empty() {
4285 prompt.to_string()
4286 } else {
4287 format!(
4288 "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4289 The JSON MUST match this schema:\n{json_schema}"
4290 )
4291 };
4292
4293 let mut cmd = super::opencode_runner::build_opencode_command_sync(
4294 binary,
4295 &resolved_model,
4296 &augmented_prompt,
4297 input_text,
4298 )?;
4299
4300 let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4301 AppError::Io(std::io::Error::new(
4302 e.kind(),
4303 format!("failed to spawn opencode: {e}"),
4304 ))
4305 })?;
4306
4307 let start = std::time::Instant::now();
4308 let timeout = std::time::Duration::from_secs(timeout_secs);
4309 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4310
4311 match status {
4312 Some(exit_status) => {
4313 tracing::debug!(
4314 target: "process",
4315 exit_code = ?exit_status.code(),
4316 elapsed_ms = start.elapsed().as_millis() as u64,
4317 "opencode process completed"
4318 );
4319
4320 let mut stdout_buf = Vec::new();
4321 if let Some(mut out) = child.stdout.take() {
4322 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4323 }
4324 if !exit_status.success() {
4325 let mut stderr_buf = Vec::new();
4326 if let Some(mut err) = child.stderr.take() {
4327 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4328 }
4329 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4330 tracing::warn!(
4331 target: "enrich",
4332 exit_code = ?exit_status.code(),
4333 stderr = %stderr_str.trim(),
4334 "opencode process failed"
4335 );
4336 return Err(AppError::Validation(format!(
4337 "opencode exited with code {:?}: {}",
4338 exit_status.code(),
4339 stderr_str.trim()
4340 )));
4341 }
4342 let stdout_str = String::from_utf8(stdout_buf)
4343 .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4344 let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4345 let value: serde_json::Value =
4346 super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4347 AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4348 })?;
4349 Ok((value, cost, false))
4350 }
4351 None => {
4352 let _ = child.kill();
4353 let _ = child.wait();
4354 Err(AppError::Validation(format!(
4355 "opencode timed out after {timeout_secs} seconds"
4356 )))
4357 }
4358 }
4359}
4360
4361#[cfg(test)]
4366mod tests {
4367 use super::*;
4368 use rusqlite::Connection;
4369 #[cfg(unix)]
4370 use std::os::unix::fs::PermissionsExt;
4371
4372 fn open_test_db() -> Connection {
4374 let conn = Connection::open_in_memory().expect("in-memory db");
4375 conn.execute_batch(
4376 "CREATE TABLE memories (
4377 id INTEGER PRIMARY KEY AUTOINCREMENT,
4378 namespace TEXT NOT NULL DEFAULT 'global',
4379 name TEXT NOT NULL,
4380 type TEXT NOT NULL DEFAULT 'note',
4381 description TEXT NOT NULL DEFAULT '',
4382 body TEXT NOT NULL DEFAULT '',
4383 body_hash TEXT NOT NULL DEFAULT '',
4384 session_id TEXT,
4385 source TEXT NOT NULL DEFAULT 'agent',
4386 metadata TEXT NOT NULL DEFAULT '{}',
4387 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4388 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4389 deleted_at INTEGER,
4390 UNIQUE(namespace, name)
4391 );
4392 CREATE TABLE entities (
4393 id INTEGER PRIMARY KEY AUTOINCREMENT,
4394 namespace TEXT NOT NULL DEFAULT 'global',
4395 name TEXT NOT NULL,
4396 type TEXT NOT NULL DEFAULT 'concept',
4397 description TEXT,
4398 degree INTEGER NOT NULL DEFAULT 0,
4399 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4400 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4401 UNIQUE(namespace, name)
4402 );
4403 CREATE TABLE memory_entities (
4404 memory_id INTEGER NOT NULL,
4405 entity_id INTEGER NOT NULL,
4406 PRIMARY KEY (memory_id, entity_id)
4407 );
4408 CREATE TABLE relationships (
4409 id INTEGER PRIMARY KEY AUTOINCREMENT,
4410 namespace TEXT NOT NULL DEFAULT 'global',
4411 source_id INTEGER NOT NULL,
4412 target_id INTEGER NOT NULL,
4413 relation TEXT NOT NULL,
4414 weight REAL NOT NULL DEFAULT 0.5,
4415 description TEXT,
4416 UNIQUE(source_id, target_id, relation)
4417 );
4418 CREATE TABLE memory_embeddings (
4419 memory_id INTEGER PRIMARY KEY,
4420 namespace TEXT NOT NULL,
4421 embedding BLOB NOT NULL,
4422 source TEXT NOT NULL,
4423 model TEXT NOT NULL DEFAULT '',
4424 dim INTEGER NOT NULL DEFAULT 384,
4425 created_at INTEGER NOT NULL DEFAULT (unixepoch())
4426 );",
4427 )
4428 .expect("schema creation must succeed");
4429 conn
4430 }
4431
4432 #[test]
4433 fn scan_unbound_memories_finds_memories_without_bindings() {
4434 let conn = open_test_db();
4435 conn.execute(
4436 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4437 [],
4438 )
4439 .unwrap();
4440
4441 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4442 assert_eq!(results.len(), 1);
4443 assert_eq!(results[0].1, "test-mem");
4444 }
4445
4446 #[test]
4447 fn scan_unbound_memories_excludes_bound_memories() {
4448 let conn = open_test_db();
4449 conn.execute(
4450 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4451 [],
4452 )
4453 .unwrap();
4454 let mem_id: i64 = conn
4455 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4456 r.get(0)
4457 })
4458 .unwrap();
4459 conn.execute(
4460 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4461 [],
4462 )
4463 .unwrap();
4464 let ent_id: i64 = conn
4465 .query_row(
4466 "SELECT id FROM entities WHERE name='some-entity'",
4467 [],
4468 |r| r.get(0),
4469 )
4470 .unwrap();
4471 conn.execute(
4472 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4473 rusqlite::params![mem_id, ent_id],
4474 )
4475 .unwrap();
4476
4477 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4478 assert!(results.is_empty(), "bound memory must not appear in scan");
4479 }
4480
4481 #[test]
4482 fn scan_entities_without_description_finds_null_description() {
4483 let conn = open_test_db();
4484 conn.execute(
4485 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4486 [],
4487 )
4488 .unwrap();
4489
4490 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4491 assert_eq!(results.len(), 1);
4492 assert_eq!(results[0].1, "my-tool");
4493 }
4494
4495 #[test]
4496 fn scan_entities_without_description_excludes_entities_with_description() {
4497 let conn = open_test_db();
4498 conn.execute(
4499 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4500 [],
4501 )
4502 .unwrap();
4503
4504 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4505 assert!(
4506 results.is_empty(),
4507 "entity with description must not appear"
4508 );
4509 }
4510
4511 #[test]
4512 fn scan_short_body_memories_finds_short_bodies() {
4513 let conn = open_test_db();
4514 conn.execute(
4515 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4516 [],
4517 )
4518 .unwrap();
4519
4520 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4521 assert_eq!(results.len(), 1);
4522 assert_eq!(results[0].1, "short-mem");
4523 }
4524
4525 #[test]
4526 fn scan_short_body_memories_excludes_long_bodies() {
4527 let conn = open_test_db();
4528 let long_body = "a".repeat(1000);
4529 conn.execute(
4530 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4531 rusqlite::params![long_body],
4532 )
4533 .unwrap();
4534
4535 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4536 assert!(results.is_empty(), "long memory must not appear in scan");
4537 }
4538
4539 #[test]
4540 fn scan_respects_limit() {
4541 let conn = open_test_db();
4542 for i in 0..5 {
4543 conn.execute(
4544 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4545 [],
4546 )
4547 .unwrap();
4548 }
4549
4550 let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4551 assert_eq!(results.len(), 3, "limit must be respected");
4552 }
4553
4554 #[test]
4555 fn scan_memories_without_embeddings_finds_only_missing_rows() {
4556 let conn = open_test_db();
4557 conn.execute(
4558 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4559 [],
4560 )
4561 .unwrap();
4562 conn.execute(
4563 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4564 [],
4565 )
4566 .unwrap();
4567 let memory_id: i64 = conn
4568 .query_row(
4569 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4570 [],
4571 |r| r.get(0),
4572 )
4573 .unwrap();
4574 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4575 memories::upsert_vec(
4576 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4577 )
4578 .unwrap();
4579
4580 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4581 assert_eq!(results.len(), 1);
4582 assert_eq!(results[0].1, "missing-vec");
4583 }
4584
4585 #[test]
4586 fn scan_memories_without_embeddings_respects_name_filter() {
4587 let conn = open_test_db();
4588 conn.execute(
4589 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4590 [],
4591 )
4592 .unwrap();
4593 conn.execute(
4594 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4595 [],
4596 )
4597 .unwrap();
4598
4599 let results =
4600 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4601 .unwrap();
4602 assert_eq!(results.len(), 1);
4603 assert_eq!(results[0].1, "match-me");
4604 }
4605
4606 #[test]
4607 fn queue_db_schema_creates_correctly() {
4608 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4609 let conn = open_queue_db(&tmp_path).expect("queue db must open");
4610 let count: i64 = conn
4611 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4612 .unwrap();
4613 assert_eq!(count, 0);
4614 let _ = std::fs::remove_file(&tmp_path);
4615 }
4616
4617 #[test]
4618 fn parse_claude_output_valid_bindings() {
4619 let output = r#"[
4620 {"type":"system","subtype":"init"},
4621 {"type":"result","is_error":false,"total_cost_usd":0.01,
4622 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4623 ]"#;
4624 let result = crate::commands::claude_runner::parse_claude_output(output)
4625 .expect("must parse successfully");
4626 assert!(result.value.get("entities").is_some());
4627 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4628 assert!(!result.is_oauth);
4629 }
4630
4631 #[test]
4632 fn parse_claude_output_detects_oauth() {
4633 let output = r#"[
4634 {"type":"system","subtype":"init","apiKeySource":"none"},
4635 {"type":"result","is_error":false,"total_cost_usd":0.0,
4636 "structured_output":{"entities":[],"relationships":[]}}
4637 ]"#;
4638 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4639 assert!(result.is_oauth);
4640 }
4641
4642 #[test]
4643 fn parse_claude_output_rate_limit_returns_error() {
4644 let output = r#"[
4645 {"type":"system","subtype":"init"},
4646 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4647 ]"#;
4648 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4649 assert!(matches!(err, AppError::RateLimited { .. }));
4650 }
4651
4652 #[test]
4653 fn parse_claude_output_auth_error() {
4654 let output = r#"[
4655 {"type":"system","subtype":"init"},
4656 {"type":"result","is_error":true,"error":"authentication failed"}
4657 ]"#;
4658 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4659 assert!(format!("{err}").contains("authentication failed"));
4660 }
4661
4662 #[cfg(unix)]
4663 #[test]
4664 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4665 let tmp = tempfile::tempdir().expect("tempdir");
4666 let binary = tmp.path().join("codex-mock");
4667 std::fs::write(
4668 &binary,
4669 r#"#!/usr/bin/env bash
4670set -euo pipefail
4671cat <<'JSONL'
4672{"type":"thread.started","thread_id":"mock-thread-0"}
4673{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4674{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4675JSONL
4676"#,
4677 )
4678 .expect("mock codex write");
4679 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4680 perms.set_mode(0o755);
4681 std::fs::set_permissions(&binary, perms).expect("chmod");
4682
4683 let (value, cost, is_oauth) =
4684 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4685 .expect("call_codex must accept body-enrich payload");
4686
4687 assert_eq!(value["enriched_body"], "expanded body");
4688 assert_eq!(cost, 0.0);
4689 assert!(!is_oauth);
4690 }
4691
4692 #[test]
4693 fn dry_run_emits_preview_without_calling_llm() {
4694 let conn = open_test_db();
4699 conn.execute(
4700 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4701 [],
4702 )
4703 .unwrap();
4704
4705 let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4706 assert_eq!(results.len(), 1);
4707 assert_eq!(results[0].1, "dry-mem");
4708 }
4711
4712 #[test]
4713 fn persist_entity_description_updates_db() {
4714 let conn = open_test_db();
4715 conn.execute(
4716 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4717 [],
4718 )
4719 .unwrap();
4720 let eid: i64 = conn
4721 .query_row(
4722 "SELECT id FROM entities WHERE name='tokio-runtime'",
4723 [],
4724 |r| r.get(0),
4725 )
4726 .unwrap();
4727
4728 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4729
4730 let desc: String = conn
4731 .query_row(
4732 "SELECT description FROM entities WHERE id=?1",
4733 rusqlite::params![eid],
4734 |r| r.get(0),
4735 )
4736 .unwrap();
4737 assert_eq!(desc, "Async runtime for Rust applications");
4738 }
4739
4740 #[test]
4741 fn bindings_schema_is_valid_json() {
4742 let _: serde_json::Value =
4743 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4744 }
4745
4746 #[test]
4747 fn entity_description_schema_is_valid_json() {
4748 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4749 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4750 }
4751
4752 #[test]
4753 fn body_enrich_schema_is_valid_json() {
4754 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4755 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4756 }
4757}