1use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51const BINDINGS_SCHEMA: &str = r#"{
56 "type": "object",
57 "properties": {
58 "entities": {
59 "type": "array",
60 "items": {
61 "type": "object",
62 "properties": {
63 "name": { "type": "string" },
64 "entity_type": {
65 "type": "string",
66 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67 }
68 },
69 "required": ["name", "entity_type"],
70 "additionalProperties": false
71 }
72 },
73 "relationships": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "source": { "type": "string" },
79 "target": { "type": "string" },
80 "relation": {
81 "type": "string",
82 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83 },
84 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85 },
86 "required": ["source","target","relation","strength"],
87 "additionalProperties": false
88 }
89 }
90 },
91 "required": ["entities","relationships"],
92 "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96 "type": "object",
97 "properties": {
98 "description": { "type": "string" }
99 },
100 "required": ["description"],
101 "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105 "type": "object",
106 "properties": {
107 "enriched_body": { "type": "string" }
108 },
109 "required": ["enriched_body"],
110 "additionalProperties": false
111}"#;
112
113const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126 "reasoning": { "type": "string" }
127 },
128 "required": ["calibrated_weight", "reasoning"],
129 "additionalProperties": false
130}"#;
131
132const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149 "type": "object",
150 "properties": {
151 "relation": { "type": "string" },
152 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153 "reasoning": { "type": "string" }
154 },
155 "required": ["relation", "strength", "reasoning"],
156 "additionalProperties": false
157}"#;
158
159const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166 "type": "object",
167 "properties": {
168 "relation": { "type": "string" },
169 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170 "reasoning": { "type": "string" }
171 },
172 "required": ["relation", "strength", "reasoning"],
173 "additionalProperties": false
174}"#;
175
176const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183 "type": "object",
184 "properties": {
185 "validated_type": { "type": "string" },
186 "was_correct": { "type": "boolean" },
187 "reasoning": { "type": "string" }
188 },
189 "required": ["validated_type", "was_correct", "reasoning"],
190 "additionalProperties": false
191}"#;
192
193const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200 "type": "object",
201 "properties": {
202 "description": { "type": "string" },
203 "reasoning": { "type": "string" }
204 },
205 "required": ["description", "reasoning"],
206 "additionalProperties": false
207}"#;
208
209const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214 "type": "object",
215 "properties": {
216 "domain": { "type": "string" },
217 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218 "reasoning": { "type": "string" }
219 },
220 "required": ["domain", "confidence", "reasoning"],
221 "additionalProperties": false
222}"#;
223
224const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230 "type": "object",
231 "properties": {
232 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234 "reasoning": { "type": "string" }
235 },
236 "required": ["quality_score", "issues", "reasoning"],
237 "additionalProperties": false
238}"#;
239
240const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247 "type": "object",
248 "properties": {
249 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251 "summary": { "type": "string" }
252 },
253 "required": ["entities", "relationships", "summary"],
254 "additionalProperties": false
255}"#;
256
257const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263 "type": "object",
264 "properties": {
265 "restructured_body": { "type": "string" },
266 "changes_summary": { "type": "string" }
267 },
268 "required": ["restructured_body", "changes_summary"],
269 "additionalProperties": false
270}"#;
271
272const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299 MemoryBindings,
301 EntityDescriptions,
303 BodyEnrich,
305 ReEmbed,
307 WeightCalibrate,
309 RelationReclassify,
311 EntityConnect,
313 EntityTypeValidate,
315 DescriptionEnrich,
317 CrossDomainBridges,
319 DomainClassify,
321 GraphAudit,
323 DeepResearchSynth,
325 BodyExtract,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332 ClaudeCode,
334 Codex,
336 #[value(name = "opencode")]
338 Opencode,
339}
340
341impl std::fmt::Display for EnrichMode {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 match self {
344 EnrichMode::ClaudeCode => write!(f, "claude-code"),
345 EnrichMode::Codex => write!(f, "codex"),
346 EnrichMode::Opencode => write!(f, "opencode"),
347 }
348 }
349}
350
351#[derive(clap::Args)]
353#[command(
354 about = "Enrich graph memories and entities using an LLM provider",
355 after_long_help = "EXAMPLES:\n \
356 # Add missing entity bindings to all unbound memories\n \
357 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
358 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
359 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
360 # Expand short memory bodies (GAP-18)\n \
361 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
362 # Rebuild only missing memory embeddings without rewriting bodies\n \
363 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
364 # Resume an interrupted body-enrich run\n \
365 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
366 # Retry only failed items from a previous run\n \
367 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
368 EXIT CODES:\n \
369 0 success\n \
370 1 validation error (bad args, binary not found)\n \
371 14 I/O error"
372)]
373pub struct EnrichArgs {
374 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
376 pub operation: EnrichOperation,
377
378 #[arg(long, value_enum, default_value = "claude-code")]
380 pub mode: EnrichMode,
381
382 #[arg(long, value_name = "N")]
384 pub limit: Option<usize>,
385
386 #[arg(long)]
388 pub dry_run: bool,
389
390 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
392 pub namespace: Option<String>,
393
394 #[arg(long, value_name = "PATH")]
397 pub claude_binary: Option<PathBuf>,
398
399 #[arg(long, value_name = "MODEL")]
401 pub claude_model: Option<String>,
402
403 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
405 pub claude_timeout: u64,
406
407 #[arg(long, value_name = "PATH")]
410 pub codex_binary: Option<PathBuf>,
411
412 #[arg(long, value_name = "MODEL")]
414 pub codex_model: Option<String>,
415
416 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
418 pub codex_timeout: u64,
419
420 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
423 pub opencode_binary: Option<PathBuf>,
424
425 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
427 pub opencode_model: Option<String>,
428
429 #[arg(
431 long,
432 value_name = "SECONDS",
433 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
434 default_value_t = 300
435 )]
436 pub opencode_timeout: u64,
437
438 #[arg(long, value_name = "USD")]
441 pub max_cost_usd: Option<f64>,
442
443 #[arg(long)]
446 pub resume: bool,
447
448 #[arg(long)]
450 pub retry_failed: bool,
451
452 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
455 pub min_output_chars: usize,
456
457 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
459 pub max_output_chars: usize,
460
461 #[arg(long, default_value_t = true)]
463 pub preserve_check: bool,
464
465 #[arg(long, value_name = "PATH")]
467 pub prompt_template: Option<PathBuf>,
468
469 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
473 pub llm_parallelism: u32,
474
475 #[arg(long)]
478 pub json: bool,
479
480 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
482 pub db: Option<String>,
483
484 #[arg(long, value_name = "SECONDS")]
487 pub wait_job_singleton: Option<u64>,
488
489 #[arg(long, default_value_t = false)]
493 pub force_job_singleton: bool,
494
495 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
499 pub names: Vec<String>,
500
501 #[arg(long, value_name = "PATH")]
505 pub names_file: Option<PathBuf>,
506
507 #[arg(long, default_value_t = false)]
511 pub preflight_check: bool,
512
513 #[arg(long, value_enum)]
517 pub fallback_mode: Option<EnrichMode>,
518
519 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
522 pub rate_limit_buffer: u64,
523
524 #[arg(long, default_value_t = true)]
528 pub max_load_check: bool,
529
530 #[arg(long, value_name = "N", default_value_t = 5)]
533 pub circuit_breaker_threshold: u32,
534
535 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
542 pub preserve_threshold: f64,
543
544 #[arg(long, default_value_t = true)]
549 pub codex_model_validate: bool,
550
551 #[arg(long, value_name = "MODEL")]
556 pub codex_model_fallback: Option<String>,
557}
558
559#[derive(Debug, Serialize)]
568struct PhaseEvent<'a> {
569 phase: &'a str,
570 #[serde(skip_serializing_if = "Option::is_none")]
571 binary_path: Option<&'a str>,
572 #[serde(skip_serializing_if = "Option::is_none")]
573 version: Option<&'a str>,
574 #[serde(skip_serializing_if = "Option::is_none")]
575 items_total: Option<usize>,
576 #[serde(skip_serializing_if = "Option::is_none")]
577 items_pending: Option<usize>,
578 #[serde(skip_serializing_if = "Option::is_none")]
580 llm_parallelism: Option<u32>,
581}
582
583#[derive(Debug, Serialize)]
584struct ItemEvent<'a> {
585 item: &'a str,
587 status: &'a str,
588 #[serde(skip_serializing_if = "Option::is_none")]
589 memory_id: Option<i64>,
590 #[serde(skip_serializing_if = "Option::is_none")]
591 entity_id: Option<i64>,
592 #[serde(skip_serializing_if = "Option::is_none")]
593 entities: Option<usize>,
594 #[serde(skip_serializing_if = "Option::is_none")]
595 rels: Option<usize>,
596 #[serde(skip_serializing_if = "Option::is_none")]
597 chars_before: Option<usize>,
598 #[serde(skip_serializing_if = "Option::is_none")]
599 chars_after: Option<usize>,
600 #[serde(skip_serializing_if = "Option::is_none")]
601 cost_usd: Option<f64>,
602 #[serde(skip_serializing_if = "Option::is_none")]
603 elapsed_ms: Option<u64>,
604 #[serde(skip_serializing_if = "Option::is_none")]
605 error: Option<String>,
606 index: usize,
607 total: usize,
608}
609
610#[derive(Debug, Serialize)]
611struct EnrichSummary {
612 summary: bool,
613 operation: String,
614 items_total: usize,
615 completed: usize,
616 failed: usize,
617 skipped: usize,
618 cost_usd: f64,
619 elapsed_ms: u64,
620 #[serde(skip_serializing_if = "Option::is_none")]
625 backend_invoked: Option<&'static str>,
626}
627
628use crate::output::emit_json_line as emit_json;
629
630fn open_queue_db(path: &str) -> Result<Connection, AppError> {
645 let conn = Connection::open(path)?;
646 conn.pragma_update(None, "journal_mode", "wal")?;
647 conn.execute_batch(
648 "CREATE TABLE IF NOT EXISTS queue (
649 id INTEGER PRIMARY KEY AUTOINCREMENT,
650 item_key TEXT NOT NULL UNIQUE,
651 item_type TEXT NOT NULL DEFAULT 'memory',
652 status TEXT NOT NULL DEFAULT 'pending',
653 memory_id INTEGER,
654 entity_id INTEGER,
655 entities INTEGER DEFAULT 0,
656 rels INTEGER DEFAULT 0,
657 error TEXT,
658 cost_usd REAL DEFAULT 0.0,
659 attempt INTEGER DEFAULT 0,
660 elapsed_ms INTEGER,
661 created_at TEXT DEFAULT (datetime('now')),
662 done_at TEXT
663 );
664 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
665 )?;
666 Ok(conn)
667}
668
669fn call_claude(
677 binary: &Path,
678 prompt: &str,
679 json_schema: &str,
680 input_text: &str,
681 model: Option<&str>,
682 timeout_secs: u64,
683) -> Result<(serde_json::Value, f64, bool), AppError> {
684 let result = crate::commands::claude_runner::run_claude(
685 binary,
686 prompt,
687 json_schema,
688 input_text,
689 model,
690 timeout_secs,
691 7,
692 )?;
693 Ok((result.value, result.cost_usd, result.is_oauth))
694}
695
696enum PreflightOutcome {
702 Healthy,
704 RateLimited {
708 reason: String,
709 suggestion: &'static str,
710 },
711 Error(AppError),
713}
714
715fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
723 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
724
725 match args.mode {
726 EnrichMode::ClaudeCode => {
727 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
728 Ok(b) => b,
729 Err(e) => return PreflightOutcome::Error(e),
730 };
731 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
736 Ok(p) => p,
737 Err(e) => {
738 return PreflightOutcome::Error(AppError::Io(e));
739 }
740 };
741 let mut cmd = std::process::Command::new(&bin);
742 crate::spawn::env_whitelist::apply_env_whitelist(
743 &mut cmd,
744 crate::spawn::env_whitelist::is_strict_env_clear(),
745 );
746 if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
747 return PreflightOutcome::Error(e);
748 }
749 cmd.arg("-p")
750 .arg("ping")
751 .arg("--max-turns")
752 .arg("1")
753 .arg("--strict-mcp-config")
754 .arg("--mcp-config")
755 .arg(mcp_config_path.as_os_str())
756 .arg("--dangerously-skip-permissions")
757 .arg("--settings")
758 .arg("{\"hooks\":{}}")
759 .arg("--output-format")
760 .arg("json")
761 .stdin(std::process::Stdio::null())
762 .stdout(std::process::Stdio::piped())
763 .stderr(std::process::Stdio::piped());
764
765 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
766 Ok(c) => c,
767 Err(e) => {
768 return PreflightOutcome::Error(AppError::Io(e));
769 }
770 };
771 let output = match wait_with_timeout(child, timeout) {
772 Ok(out) => out,
773 Err(e) => return PreflightOutcome::Error(e),
774 };
775 if !output.status.success() {
776 let stderr = String::from_utf8_lossy(&output.stderr);
777 if stderr.contains("hit your session limit")
778 || stderr.contains("rate_limit")
779 || stderr.contains("429")
780 {
781 return PreflightOutcome::RateLimited {
782 reason: stderr.trim().to_string(),
783 suggestion:
784 "wait for the OAuth window to reset or use --fallback-mode codex",
785 };
786 }
787 return PreflightOutcome::Error(AppError::Validation(format!(
788 "preflight probe failed: {stderr}",
789 stderr = stderr.trim()
790 )));
791 }
792 PreflightOutcome::Healthy
793 }
794 EnrichMode::Codex => {
795 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
796 Ok(b) => b,
797 Err(e) => return PreflightOutcome::Error(e),
798 };
799 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
800 .map_err(PreflightOutcome::Error)
801 .ok();
802 let schema = "{}";
803 let schema_path = match super::codex_spawn::trusted_schema_path() {
804 Ok(p) => p,
805 Err(e) => return PreflightOutcome::Error(e),
806 };
807 let spawn_args = super::codex_spawn::CodexSpawnArgs {
808 binary: &bin,
809 prompt: "ping",
810 json_schema: schema,
811 input_text: "",
812 model: args.codex_model.as_deref(),
813 timeout_secs: args.rate_limit_buffer.max(60),
814 schema_path: schema_path.clone(),
815 };
816 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
817 Ok(c) => c,
818 Err(e) => return PreflightOutcome::Error(e),
819 };
820 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
821 Ok(c) => c,
822 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
823 };
824 let output = match wait_with_timeout(child, timeout) {
825 Ok(out) => out,
826 Err(e) => return PreflightOutcome::Error(e),
827 };
828 let _ = std::fs::remove_file(&schema_path);
829 if !output.status.success() {
830 let stderr = String::from_utf8_lossy(&output.stderr);
831 if stderr.contains("rate_limit")
832 || stderr.contains("429")
833 || stderr.contains("Too Many Requests")
834 {
835 return PreflightOutcome::RateLimited {
836 reason: stderr.trim().to_string(),
837 suggestion: "wait for the rate-limit window to reset",
838 };
839 }
840 return PreflightOutcome::Error(AppError::Validation(format!(
841 "preflight probe failed: {stderr}",
842 stderr = stderr.trim()
843 )));
844 }
845 PreflightOutcome::Healthy
846 }
847 EnrichMode::Opencode => {
848 let bin = match super::opencode_runner::find_opencode_binary_with_override(
849 args.opencode_binary.as_deref(),
850 ) {
851 Ok(b) => b,
852 Err(e) => return PreflightOutcome::Error(e),
853 };
854 let model =
855 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
856 let mut cmd =
857 match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
858 {
859 Ok(c) => c,
860 Err(e) => return PreflightOutcome::Error(e),
861 };
862 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
863 Ok(c) => c,
864 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
865 };
866 let output = match wait_with_timeout(child, timeout) {
867 Ok(out) => out,
868 Err(e) => return PreflightOutcome::Error(e),
869 };
870 if !output.status.success() {
871 let stderr = String::from_utf8_lossy(&output.stderr);
872 if stderr.contains("rate_limit")
873 || stderr.contains("429")
874 || stderr.contains("Too Many Requests")
875 {
876 return PreflightOutcome::RateLimited {
877 reason: stderr.trim().to_string(),
878 suggestion: "wait for the rate-limit window to reset",
879 };
880 }
881 return PreflightOutcome::Error(AppError::Validation(format!(
882 "preflight probe failed: {stderr}",
883 stderr = stderr.trim()
884 )));
885 }
886 PreflightOutcome::Healthy
887 }
888 }
889}
890
891fn wait_with_timeout(
893 mut child: std::process::Child,
894 timeout: std::time::Duration,
895) -> Result<std::process::Output, AppError> {
896 use wait_timeout::ChildExt;
897 let start = std::time::Instant::now();
898 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
899 if status.is_none() {
900 let _ = child.kill();
901 let _ = child.wait();
902 return Err(AppError::Validation(format!(
903 "preflight probe timed out after {}s",
904 start.elapsed().as_secs()
905 )));
906 }
907 let mut stdout = Vec::new();
908 if let Some(mut out) = child.stdout.take() {
909 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
910 }
911 let mut stderr = Vec::new();
912 if let Some(mut err) = child.stderr.take() {
913 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
914 }
915 let exit = status.unwrap();
916 Ok(std::process::Output {
917 status: exit,
918 stdout,
919 stderr,
920 })
921}
922
923fn scan_unbound_memories(
934 conn: &Connection,
935 namespace: &str,
936 limit: Option<usize>,
937 name_filter: &[String],
938) -> Result<Vec<(i64, String, String)>, AppError> {
939 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
940
941 if name_filter.is_empty() {
942 let sql = format!(
943 "SELECT m.id, m.name, m.body
944 FROM memories m
945 WHERE m.namespace = ?1
946 AND m.deleted_at IS NULL
947 AND NOT EXISTS (
948 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
949 )
950 ORDER BY m.id
951 {limit_clause}"
952 );
953 let mut stmt = conn.prepare(&sql)?;
954 let rows = stmt
955 .query_map(rusqlite::params![namespace], |r| {
956 Ok((
957 r.get::<_, i64>(0)?,
958 r.get::<_, String>(1)?,
959 r.get::<_, String>(2)?,
960 ))
961 })?
962 .collect::<Result<Vec<_>, _>>()?;
963 Ok(rows)
964 } else {
965 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
967 .map(|i| format!("?{i}"))
968 .collect();
969 let in_clause = placeholders.join(", ");
970 let sql = format!(
971 "SELECT m.id, m.name, m.body
972 FROM memories m
973 WHERE m.namespace = ?1
974 AND m.deleted_at IS NULL
975 AND m.name IN ({in_clause})
976 AND NOT EXISTS (
977 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
978 )
979 ORDER BY m.id
980 {limit_clause}"
981 );
982 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
983 params_vec.push(&namespace);
984 for n in name_filter {
985 params_vec.push(n);
986 }
987 let mut stmt = conn.prepare(&sql)?;
988 let rows = stmt
989 .query_map(
990 rusqlite::params_from_iter(params_vec.iter().copied()),
991 |r| {
992 Ok((
993 r.get::<_, i64>(0)?,
994 r.get::<_, String>(1)?,
995 r.get::<_, String>(2)?,
996 ))
997 },
998 )?
999 .collect::<Result<Vec<_>, _>>()?;
1000 Ok(rows)
1001 }
1002}
1003
1004fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1009 let content = std::fs::read_to_string(path).map_err(|e| {
1010 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1011 })?;
1012 let mut seen = std::collections::HashSet::new();
1013 let mut out = Vec::new();
1014 for line in content.lines() {
1015 let trimmed = line.trim();
1016 if trimmed.is_empty() || trimmed.starts_with('#') {
1017 continue;
1018 }
1019 if seen.insert(trimmed.to_string()) {
1020 out.push(trimmed.to_string());
1021 }
1022 }
1023 Ok(out)
1024}
1025
1026fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1028 let mut combined: Vec<String> = args.names.clone();
1029 if let Some(p) = &args.names_file {
1030 let from_file = read_names_file(p)?;
1031 for n in from_file {
1032 if !combined.contains(&n) {
1033 combined.push(n);
1034 }
1035 }
1036 }
1037 Ok(combined)
1038}
1039
1040fn scan_entities_without_description(
1044 conn: &Connection,
1045 namespace: &str,
1046 limit: Option<usize>,
1047 name_filter: &[String],
1048) -> Result<Vec<(i64, String, String)>, AppError> {
1049 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1050
1051 if name_filter.is_empty() {
1052 let sql = format!(
1053 "SELECT id, name, type
1054 FROM entities
1055 WHERE namespace = ?1
1056 AND (description IS NULL OR description = '')
1057 ORDER BY id
1058 {limit_clause}"
1059 );
1060 let mut stmt = conn.prepare(&sql)?;
1061 let rows = stmt
1062 .query_map(rusqlite::params![namespace], |r| {
1063 Ok((
1064 r.get::<_, i64>(0)?,
1065 r.get::<_, String>(1)?,
1066 r.get::<_, String>(2)?,
1067 ))
1068 })?
1069 .collect::<Result<Vec<_>, _>>()?;
1070 Ok(rows)
1071 } else {
1072 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1073 .map(|i| format!("?{i}"))
1074 .collect();
1075 let in_clause = placeholders.join(", ");
1076 let sql = format!(
1077 "SELECT id, name, type
1078 FROM entities
1079 WHERE namespace = ?1
1080 AND name IN ({in_clause})
1081 AND (description IS NULL OR description = '')
1082 ORDER BY id
1083 {limit_clause}"
1084 );
1085 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1086 params_vec.push(&namespace);
1087 for n in name_filter {
1088 params_vec.push(n);
1089 }
1090 let mut stmt = conn.prepare(&sql)?;
1091 let rows = stmt
1092 .query_map(
1093 rusqlite::params_from_iter(params_vec.iter().copied()),
1094 |r| {
1095 Ok((
1096 r.get::<_, i64>(0)?,
1097 r.get::<_, String>(1)?,
1098 r.get::<_, String>(2)?,
1099 ))
1100 },
1101 )?
1102 .collect::<Result<Vec<_>, _>>()?;
1103 Ok(rows)
1104 }
1105}
1106
1107fn scan_short_body_memories(
1111 conn: &Connection,
1112 namespace: &str,
1113 min_chars: usize,
1114 limit: Option<usize>,
1115 name_filter: &[String],
1116) -> Result<Vec<(i64, String, String)>, AppError> {
1117 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1118
1119 if name_filter.is_empty() {
1120 let sql = format!(
1121 "SELECT m.id, m.name, m.body
1122 FROM memories m
1123 WHERE m.namespace = ?1
1124 AND m.deleted_at IS NULL
1125 AND LENGTH(COALESCE(m.body,'')) < ?2
1126 ORDER BY m.id
1127 {limit_clause}"
1128 );
1129 let mut stmt = conn.prepare(&sql)?;
1130 let rows = stmt
1131 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1132 Ok((
1133 r.get::<_, i64>(0)?,
1134 r.get::<_, String>(1)?,
1135 r.get::<_, String>(2)?,
1136 ))
1137 })?
1138 .collect::<Result<Vec<_>, _>>()?;
1139 Ok(rows)
1140 } else {
1141 let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1142 .map(|i| format!("?{i}"))
1143 .collect();
1144 let in_clause = placeholders.join(", ");
1145 let sql = format!(
1146 "SELECT m.id, m.name, m.body
1147 FROM memories m
1148 WHERE m.namespace = ?1
1149 AND m.deleted_at IS NULL
1150 AND m.name IN ({in_clause})
1151 AND LENGTH(COALESCE(m.body,'')) < ?2
1152 ORDER BY m.id
1153 {limit_clause}"
1154 );
1155 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1156 let min_chars_i64 = min_chars as i64;
1157 params_vec.push(&namespace);
1158 params_vec.push(&min_chars_i64);
1159 for n in name_filter {
1160 params_vec.push(n);
1161 }
1162 let mut stmt = conn.prepare(&sql)?;
1163 let rows = stmt
1164 .query_map(
1165 rusqlite::params_from_iter(params_vec.iter().copied()),
1166 |r| {
1167 Ok((
1168 r.get::<_, i64>(0)?,
1169 r.get::<_, String>(1)?,
1170 r.get::<_, String>(2)?,
1171 ))
1172 },
1173 )?
1174 .collect::<Result<Vec<_>, _>>()?;
1175 Ok(rows)
1176 }
1177}
1178
1179fn scan_memories_without_embeddings(
1183 conn: &Connection,
1184 namespace: &str,
1185 limit: Option<usize>,
1186 name_filter: &[String],
1187) -> Result<Vec<(i64, String, String)>, AppError> {
1188 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1189
1190 if name_filter.is_empty() {
1191 let sql = format!(
1192 "SELECT m.id, m.name, COALESCE(m.body,'')
1193 FROM memories m
1194 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1195 WHERE m.namespace = ?1
1196 AND m.deleted_at IS NULL
1197 AND me.memory_id IS NULL
1198 ORDER BY m.id
1199 {limit_clause}"
1200 );
1201 let mut stmt = conn.prepare(&sql)?;
1202 let rows = stmt
1203 .query_map(rusqlite::params![namespace], |r| {
1204 Ok((
1205 r.get::<_, i64>(0)?,
1206 r.get::<_, String>(1)?,
1207 r.get::<_, String>(2)?,
1208 ))
1209 })?
1210 .collect::<Result<Vec<_>, _>>()?;
1211 Ok(rows)
1212 } else {
1213 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1214 .map(|i| format!("?{i}"))
1215 .collect();
1216 let in_clause = placeholders.join(", ");
1217 let sql = format!(
1218 "SELECT m.id, m.name, COALESCE(m.body,'')
1219 FROM memories m
1220 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1221 WHERE m.namespace = ?1
1222 AND m.deleted_at IS NULL
1223 AND m.name IN ({in_clause})
1224 AND me.memory_id IS NULL
1225 ORDER BY m.id
1226 {limit_clause}"
1227 );
1228 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1229 params_vec.push(&namespace);
1230 for n in name_filter {
1231 params_vec.push(n);
1232 }
1233 let mut stmt = conn.prepare(&sql)?;
1234 let rows = stmt
1235 .query_map(
1236 rusqlite::params_from_iter(params_vec.iter().copied()),
1237 |r| {
1238 Ok((
1239 r.get::<_, i64>(0)?,
1240 r.get::<_, String>(1)?,
1241 r.get::<_, String>(2)?,
1242 ))
1243 },
1244 )?
1245 .collect::<Result<Vec<_>, _>>()?;
1246 Ok(rows)
1247 }
1248}
1249
1250#[allow(clippy::type_complexity)]
1252fn scan_weight_candidates(
1253 conn: &Connection,
1254 namespace: &str,
1255 limit: Option<usize>,
1256) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1257 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1258 let sql = format!(
1259 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1260 FROM relationships r \
1261 JOIN entities e1 ON e1.id = r.source_id \
1262 JOIN entities e2 ON e2.id = r.target_id \
1263 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1264 ORDER BY r.weight DESC {limit_clause}"
1265 );
1266 let mut stmt = conn.prepare(&sql)?;
1267 let rows = stmt
1268 .query_map(rusqlite::params![namespace], |r| {
1269 Ok((
1270 r.get::<_, i64>(0)?,
1271 r.get::<_, String>(1)?,
1272 r.get::<_, String>(2)?,
1273 r.get::<_, String>(3)?,
1274 r.get::<_, f64>(4)?,
1275 ))
1276 })?
1277 .collect::<Result<Vec<_>, _>>()?;
1278 Ok(rows)
1279}
1280
1281fn scan_generic_relations(
1283 conn: &Connection,
1284 namespace: &str,
1285 limit: Option<usize>,
1286) -> Result<Vec<(i64, String, String, String)>, AppError> {
1287 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1288 let sql = format!(
1289 "SELECT r.id, e1.name, e2.name, r.relation \
1290 FROM relationships r \
1291 JOIN entities e1 ON e1.id = r.source_id \
1292 JOIN entities e2 ON e2.id = r.target_id \
1293 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1294 ORDER BY r.id {limit_clause}"
1295 );
1296 let mut stmt = conn.prepare(&sql)?;
1297 let rows = stmt
1298 .query_map(rusqlite::params![namespace], |r| {
1299 Ok((
1300 r.get::<_, i64>(0)?,
1301 r.get::<_, String>(1)?,
1302 r.get::<_, String>(2)?,
1303 r.get::<_, String>(3)?,
1304 ))
1305 })?
1306 .collect::<Result<Vec<_>, _>>()?;
1307 Ok(rows)
1308}
1309
1310fn persist_memory_bindings(
1319 conn: &Connection,
1320 namespace: &str,
1321 memory_id: i64,
1322 entities_json: &serde_json::Value,
1323 rels_json: &serde_json::Value,
1324) -> Result<(usize, usize), AppError> {
1325 #[derive(Deserialize)]
1326 struct EntityItem {
1327 name: String,
1328 entity_type: String,
1329 }
1330 #[derive(Deserialize)]
1331 struct RelItem {
1332 source: String,
1333 target: String,
1334 relation: String,
1335 strength: f64,
1336 }
1337
1338 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1339 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1340
1341 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1342 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1343
1344 let mut ent_count = 0usize;
1345 let mut rel_count = 0usize;
1346
1347 for item in &extracted_entities {
1348 let entity_type = match item.entity_type.parse::<EntityType>() {
1349 Ok(et) => et,
1350 Err(_) => {
1351 tracing::warn!(
1352 target: "enrich",
1353 entity = %item.name,
1354 entity_type = %item.entity_type,
1355 "entity type not recognized, skipping"
1356 );
1357 continue;
1358 }
1359 };
1360 match entities::upsert_entity(
1361 conn,
1362 namespace,
1363 &NewEntity {
1364 name: item.name.clone(),
1365 entity_type,
1366 description: None,
1367 },
1368 ) {
1369 Ok(eid) => {
1370 let _ = entities::link_memory_entity(conn, memory_id, eid);
1371 ent_count += 1;
1372 }
1373 Err(e) => {
1374 tracing::warn!(
1375 target: "enrich",
1376 entity = %item.name,
1377 error = %e,
1378 "entity upsert skipped"
1379 );
1380 }
1381 }
1382 }
1383
1384 for rel in &extracted_rels {
1385 let normalized = crate::parsers::normalize_relation(&rel.relation);
1386 crate::parsers::warn_if_non_canonical(&normalized);
1387
1388 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1391 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1392 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1393 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1394 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1395 let new_rel = NewRelationship {
1396 source: rel.source.clone(),
1397 target: rel.target.clone(),
1398 relation: normalized,
1399 strength: rel.strength,
1400 description: None,
1401 };
1402 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1403 rel_count += 1;
1404 }
1405 }
1406 }
1407
1408 Ok((ent_count, rel_count))
1409}
1410
1411fn persist_entity_description(
1413 conn: &Connection,
1414 entity_id: i64,
1415 description: &str,
1416) -> Result<(), AppError> {
1417 conn.execute(
1418 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1419 rusqlite::params![description, entity_id],
1420 )?;
1421 Ok(())
1422}
1423
1424#[allow(clippy::too_many_arguments)]
1430fn reembed_memory_vector(
1431 conn: &Connection,
1432 namespace: &str,
1433 memory_id: i64,
1434 memory_name: &str,
1435 memory_type: &str,
1436 body: &str,
1437 paths: &crate::paths::AppPaths,
1438 llm_backend: crate::cli::LlmBackendChoice,
1439) -> Result<(), AppError> {
1440 let snippet: String = body.chars().take(200).collect();
1441 let (embedding, backend_kind) =
1446 crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1447 record_enrich_backend(backend_kind.as_str());
1448 memories::upsert_vec(
1449 conn,
1450 memory_id,
1451 namespace,
1452 memory_type,
1453 &embedding,
1454 memory_name,
1455 &snippet,
1456 )?;
1457 Ok(())
1458}
1459
1460fn record_enrich_backend(backend: &'static str) {
1466 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1467 *guard = Some(backend);
1468 }
1469}
1470
1471fn take_enrich_backend() -> Option<&'static str> {
1472 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1473}
1474
1475static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1476
1477fn persist_enriched_body(
1482 conn: &Connection,
1483 namespace: &str,
1484 memory_id: i64,
1485 memory_name: &str,
1486 new_body: &str,
1487 paths: &crate::paths::AppPaths,
1488 llm_backend: crate::cli::LlmBackendChoice,
1489) -> Result<(), AppError> {
1490 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1492 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1493 rusqlite::params![memory_id],
1494 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1495 )?;
1496
1497 let memory_type: String = conn.query_row(
1498 "SELECT type FROM memories WHERE id=?1",
1499 rusqlite::params![memory_id],
1500 |r| r.get(0),
1501 )?;
1502
1503 let description: String = conn.query_row(
1504 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1505 rusqlite::params![memory_id],
1506 |r| r.get(0),
1507 )?;
1508
1509 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1510
1511 let new_memory = memories::NewMemory {
1512 namespace: namespace.to_string(),
1513 name: memory_name.to_string(),
1514 memory_type: memory_type.clone(),
1515 description: description.clone(),
1516 body: new_body.to_string(),
1517 body_hash,
1518 session_id: None,
1519 source: "agent".to_string(),
1520 metadata: serde_json::json!({
1521 "operation": "body-enrich",
1522 "orig_chars": old_body.chars().count(),
1523 "new_chars": new_body.chars().count(),
1524 }),
1525 };
1526
1527 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1531 let version_metadata = serde_json::json!({
1532 "operation": "body-enrich",
1533 "orig_chars": old_body.chars().count(),
1534 "new_chars": new_body.chars().count(),
1535 })
1536 .to_string();
1537 crate::storage::versions::insert_version(
1538 conn,
1539 memory_id,
1540 next_version,
1541 memory_name,
1542 &memory_type,
1543 &description,
1544 new_body,
1545 &version_metadata,
1546 Some("enrich"),
1547 "edit",
1548 )?;
1549
1550 memories::update(conn, memory_id, &new_memory, None)?;
1551 memories::sync_fts_after_update(
1552 conn,
1553 memory_id,
1554 &old_name,
1555 &old_desc,
1556 &old_body,
1557 &new_memory.name,
1558 &new_memory.description,
1559 &new_memory.body,
1560 )?;
1561
1562 if let Err(e) = reembed_memory_vector(
1564 conn,
1565 namespace,
1566 memory_id,
1567 memory_name,
1568 &memory_type,
1569 new_body,
1570 paths,
1571 llm_backend,
1572 ) {
1573 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1574 }
1575
1576 Ok(())
1577}
1578
1579fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1591 value == default
1592}
1593
1594fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1609 const DEFAULT_TIMEOUT: u64 = 300;
1610
1611 let mut conflicts: Vec<String> = Vec::new();
1612
1613 match args.mode {
1614 EnrichMode::ClaudeCode => {
1615 if args.codex_binary.is_some() {
1616 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1617 }
1618 if args.codex_model.is_some() {
1619 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1620 }
1621 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1622 conflicts.push(format!(
1623 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1624 args.codex_timeout
1625 ));
1626 }
1627 }
1628 EnrichMode::Codex => {
1629 if args.claude_binary.is_some() {
1630 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1631 }
1632 if args.claude_model.is_some() {
1633 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1634 }
1635 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1636 conflicts.push(format!(
1637 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1638 args.claude_timeout
1639 ));
1640 }
1641 if args.max_cost_usd.is_some() {
1642 conflicts.push(
1643 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1644 .to_string(),
1645 );
1646 }
1647 }
1648 EnrichMode::Opencode => {
1649 if args.claude_binary.is_some() {
1650 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1651 }
1652 if args.claude_model.is_some() {
1653 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1654 }
1655 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1656 conflicts.push(format!(
1657 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1658 args.claude_timeout
1659 ));
1660 }
1661 if args.max_cost_usd.is_some() {
1662 conflicts.push(
1663 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1664 .to_string(),
1665 );
1666 }
1667 }
1668 }
1669
1670 if !conflicts.is_empty() {
1671 return Err(AppError::Validation(format!(
1672 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1673 args.mode,
1674 conflicts.join("\n - ")
1675 )));
1676 }
1677
1678 Ok(())
1679}
1680
1681pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1685 validate_mode_conditional_flags_enrich(args)?;
1688 let started = Instant::now();
1689
1690 let paths = AppPaths::resolve(args.db.as_deref())?;
1691 ensure_db_ready(&paths)?;
1692 let conn = open_rw(&paths.db)?;
1693 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1694
1695 let wait_secs = args.wait_job_singleton;
1701 let force_flag = args.force_job_singleton;
1702 let _singleton = crate::lock::acquire_job_singleton(
1703 crate::lock::JobType::Enrich,
1704 &namespace,
1705 &paths.db,
1706 wait_secs,
1707 force_flag,
1708 )?;
1709
1710 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1712 None
1713 } else {
1714 Some(match args.mode {
1715 EnrichMode::ClaudeCode => {
1716 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1717 let version = super::claude_runner::validate_claude_version(&bin)?;
1718 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1719 emit_json(&PhaseEvent {
1720 phase: "validate",
1721 binary_path: bin.to_str(),
1722 version: Some(&version),
1723 items_total: None,
1724 items_pending: None,
1725 llm_parallelism: None,
1726 });
1727 bin
1728 }
1729 EnrichMode::Codex => {
1730 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1731 emit_json(&PhaseEvent {
1732 phase: "validate",
1733 binary_path: bin.to_str(),
1734 version: None,
1735 items_total: None,
1736 items_pending: None,
1737 llm_parallelism: None,
1738 });
1739 bin
1740 }
1741 EnrichMode::Opencode => {
1742 let bin = super::opencode_runner::find_opencode_binary_with_override(
1743 args.opencode_binary.as_deref(),
1744 )?;
1745 emit_json(&PhaseEvent {
1746 phase: "validate",
1747 binary_path: bin.to_str(),
1748 version: None,
1749 items_total: None,
1750 items_pending: None,
1751 llm_parallelism: None,
1752 });
1753 bin
1754 }
1755 })
1756 };
1757
1758 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1762 let load = crate::system_load::load_average_one();
1763 let n = crate::system_load::ncpus();
1764 return Err(AppError::Validation(format!(
1765 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1766 pass --no-max-load-check to override (not recommended)"
1767 )));
1768 }
1769
1770 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1777 {
1778 let preflight_result = run_preflight_probe(args);
1779 match preflight_result {
1780 PreflightOutcome::Healthy => {
1781 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1782 }
1783 PreflightOutcome::RateLimited { reason, suggestion } => {
1784 if let Some(fallback) = args.fallback_mode.clone() {
1785 if fallback != args.mode {
1786 return Err(AppError::Validation(format!(
1796 "preflight detected rate limit on {mode:?}: {reason}; \
1797 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1798 mode = args.mode
1799 )));
1800 }
1801 return Err(AppError::Validation(format!(
1802 "preflight detected rate limit on {mode:?}: {reason}; \
1803 --fallback-mode matches --mode, no recovery possible",
1804 mode = args.mode
1805 )));
1806 }
1807 return Err(AppError::Validation(format!(
1808 "preflight detected rate limit on {mode:?}: {reason}; \
1809 {suggestion}; pass --fallback-mode codex to recover",
1810 mode = args.mode
1811 )));
1812 }
1813 PreflightOutcome::Error(e) => {
1814 return Err(e);
1815 }
1816 }
1817 }
1818
1819 let scan_result = scan_operation(&conn, &namespace, args)?;
1821 let total = scan_result.len();
1822
1823 emit_json(&PhaseEvent {
1824 phase: "scan",
1825 binary_path: None,
1826 version: None,
1827 items_total: Some(total),
1828 items_pending: Some(total),
1829 llm_parallelism: Some(args.llm_parallelism),
1830 });
1831
1832 if args.dry_run {
1834 for (idx, key) in scan_result.iter().enumerate() {
1835 emit_json(&ItemEvent {
1836 item: key,
1837 status: "preview",
1838 memory_id: None,
1839 entity_id: None,
1840 entities: None,
1841 rels: None,
1842 chars_before: None,
1843 chars_after: None,
1844 cost_usd: None,
1845 elapsed_ms: None,
1846 error: None,
1847 index: idx,
1848 total,
1849 });
1850 }
1851 emit_json(&EnrichSummary {
1852 summary: true,
1853 operation: format!("{:?}", args.operation),
1854 items_total: total,
1855 completed: 0,
1856 failed: 0,
1857 skipped: 0,
1858 cost_usd: 0.0,
1859 elapsed_ms: started.elapsed().as_millis() as u64,
1860 backend_invoked: take_enrich_backend(),
1861 });
1862 return Ok(());
1863 }
1864
1865 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1869
1870 if args.resume {
1871 let reset = queue_conn
1872 .execute(
1873 "UPDATE queue SET status='pending' WHERE status='processing'",
1874 [],
1875 )
1876 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1877 if reset > 0 {
1878 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1879 }
1880 }
1881
1882 if args.retry_failed {
1883 let count = queue_conn
1884 .execute(
1885 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1886 [],
1887 )
1888 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1889 tracing::info!(target: "enrich", count, "retrying failed items");
1890 }
1891
1892 if !args.resume && !args.retry_failed {
1893 queue_conn
1894 .execute("DELETE FROM queue", [])
1895 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1896 }
1897
1898 for (idx, key) in scan_result.iter().enumerate() {
1900 let item_type = match args.operation {
1901 EnrichOperation::EntityDescriptions => "entity",
1902 _ => "memory",
1903 };
1904 if let Err(e) = queue_conn.execute(
1905 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1906 rusqlite::params![key, item_type],
1907 ) {
1908 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1909 }
1910 let _ = idx; }
1912
1913 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1916 if parallelism > 1 {
1917 tracing::info!(
1918 target: "enrich",
1919 llm_parallelism = parallelism,
1920 "parallel LLM processing with bounded thread pool"
1921 );
1922 }
1923 if parallelism > 4 {
1927 match args.mode {
1928 EnrichMode::ClaudeCode => {
1929 tracing::warn!(
1930 target: "enrich",
1931 llm_parallelism = parallelism,
1932 recommended_max = 4,
1933 mode = "claude-code",
1934 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1935 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1936 to cut MCP children (G28-A)"
1937 );
1938 }
1939 EnrichMode::Codex if parallelism > 16 => {
1940 tracing::warn!(
1941 target: "enrich",
1942 llm_parallelism = parallelism,
1943 recommended_max = 16,
1944 mode = "codex",
1945 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1946 consider --llm-parallelism 8 for safer concurrency"
1947 );
1948 }
1949 EnrichMode::Codex => {
1950 }
1954 EnrichMode::Opencode if parallelism > 16 => {
1955 tracing::warn!(
1956 target: "enrich",
1957 llm_parallelism = parallelism,
1958 recommended_max = 16,
1959 mode = "opencode",
1960 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1961 consider --llm-parallelism 8 for safer concurrency"
1962 );
1963 }
1964 EnrichMode::Opencode => {
1965 }
1967 }
1968 }
1969
1970 let mut completed = 0usize;
1971 let mut failed = 0usize;
1972 let mut skipped = 0usize;
1973 let mut cost_total = 0.0f64;
1974 let mut oauth_detected = false;
1975 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1976 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1977 let enrich_started = std::time::Instant::now();
1978
1979 let provider_timeout = match args.mode {
1980 EnrichMode::ClaudeCode => args.claude_timeout,
1981 EnrichMode::Codex => args.codex_timeout,
1982 EnrichMode::Opencode => args.opencode_timeout,
1983 };
1984
1985 let provider_model: Option<&str> = match args.mode {
1986 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1987 EnrichMode::Codex => args.codex_model.as_deref(),
1988 EnrichMode::Opencode => args.opencode_model.as_deref(),
1989 };
1990
1991 if parallelism > 1 {
1995 let stdout_mu = parking_lot::Mutex::new(());
1996 let budget = args.max_cost_usd;
1997 let operation = args.operation.clone();
1998 let mode = args.mode.clone();
1999 let min_oc = args.min_output_chars;
2000 let max_oc = args.max_output_chars;
2001 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2002
2003 struct WorkerResult {
2004 completed: usize,
2005 failed: usize,
2006 skipped: usize,
2007 cost: f64,
2008 oauth: bool,
2009 }
2010
2011 let results: Vec<WorkerResult> = std::thread::scope(|s| {
2012 let handles: Vec<_> = (0..parallelism)
2013 .map(|worker_id| {
2014 let stdout_mu = &stdout_mu;
2015 let paths = &paths;
2016 let namespace = &namespace;
2017 let provider_binary = provider_binary.as_deref();
2018 let operation = &operation;
2019 let mode = &mode;
2020 let prompt_tpl = prompt_tpl.as_deref();
2021 s.spawn(move || {
2022 let w_conn = match open_rw(&paths.db) {
2023 Ok(c) => c,
2024 Err(e) => {
2025 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2026 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2027 }
2028 };
2029 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2030 Ok(c) => c,
2031 Err(e) => {
2032 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2033 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2034 }
2035 };
2036 let mut w_completed = 0usize;
2037 let mut w_failed = 0usize;
2038 let mut w_skipped = 0usize;
2039 let mut w_cost = 0.0f64;
2040 let mut w_oauth = false;
2041 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2042 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2043 let mut w_breaker = crate::retry::CircuitBreaker::new(
2049 args.circuit_breaker_threshold.max(1),
2050 std::time::Duration::from_secs(60),
2051 );
2052
2053 loop {
2054 if crate::shutdown_requested() {
2055 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2056 break;
2057 }
2058 if let Some(b) = budget {
2059 if !w_oauth && w_cost >= b {
2060 break;
2061 }
2062 }
2063 let pending: Option<(i64, String, String)> = w_queue
2064 .query_row(
2065 "UPDATE queue SET status='processing', attempt=attempt+1 \
2066 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2067 RETURNING id, item_key, item_type",
2068 [],
2069 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2070 )
2071 .ok();
2072 let (queue_id, item_key, _item_type) = match pending {
2073 Some(p) => p,
2074 None => break,
2075 };
2076 let item_started = Instant::now();
2077 let current_index = w_completed + w_failed + w_skipped;
2078
2079 let call_result = match operation {
2080 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2081 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2082 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
2083 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
2084 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2085 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2086 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2087 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2088 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2089 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2090 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2091 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2092 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2093 };
2094
2095 match call_result {
2096 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2097 if is_oauth { w_oauth = true; }
2098 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2099 let _ = w_queue.execute(
2100 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2101 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2102 );
2103 w_completed += 1;
2104 if !is_oauth { w_cost += cost; }
2105 let _ = w_breaker
2107 .record(crate::retry::AttemptOutcome::Success);
2108 let _guard = stdout_mu.lock();
2109 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2110 }
2111 Ok(EnrichItemResult::Skipped { reason }) => {
2112 w_skipped += 1;
2113 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2114 let _guard = stdout_mu.lock();
2115 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2116 }
2117 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2118 w_skipped += 1;
2124 let reason = format!(
2125 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2126 );
2127 let _ = w_queue.execute(
2128 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2129 rusqlite::params![reason, queue_id],
2130 );
2131 let _guard = stdout_mu.lock();
2132 emit_json(&ItemEvent {
2133 item: &item_key,
2134 status: "preservation_failed",
2135 memory_id: None,
2136 entity_id: None,
2137 entities: None,
2138 rels: None,
2139 chars_before: Some(chars_before),
2140 chars_after: Some(chars_after),
2141 cost_usd: None,
2142 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2143 error: Some(reason),
2144 index: current_index,
2145 total,
2146 });
2147 }
2148 Err(e) => {
2149 let err_str = format!("{e}");
2150 if matches!(e, AppError::RateLimited { .. }) {
2151 if crate::retry::is_kill_switch_active() {
2152 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2153 } else if std::time::Instant::now() >= w_deadline {
2154 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2155 } else {
2156 let half = w_backoff / 2;
2157 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2158 let actual_wait = half + jitter;
2159 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2160 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2161 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2162 w_backoff = (w_backoff * 2).min(900);
2163 continue;
2164 }
2165 }
2166 w_failed += 1;
2167 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2168 let _guard = stdout_mu.lock();
2169 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2170 let breaker_opened = w_breaker
2172 .record(crate::retry::AttemptOutcome::HardFailure);
2173 if breaker_opened {
2174 tracing::error!(target: "enrich",
2175 consecutive_failures = w_breaker.consecutive_failures(),
2176 "circuit breaker opened — aborting worker"
2177 );
2178 break;
2179 }
2180 }
2181 }
2182 }
2183 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2184 })
2185 })
2186 .collect();
2187 handles
2188 .into_iter()
2189 .map(|h| {
2190 h.join().unwrap_or(WorkerResult {
2191 completed: 0,
2192 failed: 0,
2193 skipped: 0,
2194 cost: 0.0,
2195 oauth: false,
2196 })
2197 })
2198 .collect()
2199 });
2200
2201 for r in &results {
2202 completed += r.completed;
2203 failed += r.failed;
2204 skipped += r.skipped;
2205 cost_total += r.cost;
2206 if r.oauth && !oauth_detected {
2207 oauth_detected = true;
2208 }
2209 }
2210 } else {
2211 loop {
2213 if crate::shutdown_requested() {
2214 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2215 break;
2216 }
2217
2218 if let Some(budget) = args.max_cost_usd {
2220 if !oauth_detected && cost_total >= budget {
2221 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2222 break;
2223 }
2224 }
2225
2226 let pending: Option<(i64, String, String)> = queue_conn
2228 .query_row(
2229 "UPDATE queue SET status='processing', attempt=attempt+1 \
2230 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2231 RETURNING id, item_key, item_type",
2232 [],
2233 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2234 )
2235 .ok();
2236
2237 let (queue_id, item_key, item_type) = match pending {
2238 Some(p) => p,
2239 None => break,
2240 };
2241
2242 let item_started = Instant::now();
2243 let current_index = completed + failed + skipped;
2244
2245 let call_result = match args.operation {
2246 EnrichOperation::MemoryBindings => call_memory_bindings(
2247 &conn,
2248 &namespace,
2249 &item_key,
2250 provider_binary
2251 .as_deref()
2252 .expect("provider binary required"),
2253 provider_model,
2254 provider_timeout,
2255 &args.mode,
2256 ),
2257 EnrichOperation::EntityDescriptions => call_entity_description(
2258 &conn,
2259 &namespace,
2260 &item_key,
2261 provider_binary
2262 .as_deref()
2263 .expect("provider binary required"),
2264 provider_model,
2265 provider_timeout,
2266 &args.mode,
2267 ),
2268 EnrichOperation::BodyEnrich => call_body_enrich(
2269 &conn,
2270 &namespace,
2271 &item_key,
2272 provider_binary
2273 .as_deref()
2274 .expect("provider binary required"),
2275 provider_model,
2276 provider_timeout,
2277 &args.mode,
2278 args.min_output_chars,
2279 args.max_output_chars,
2280 args.prompt_template.as_deref(),
2281 args.preserve_threshold,
2282 &paths,
2283 llm_backend,
2284 ),
2285 EnrichOperation::ReEmbed => {
2286 call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2287 }
2288 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2289 &conn,
2290 &namespace,
2291 &item_key,
2292 provider_binary
2293 .as_deref()
2294 .expect("provider binary required"),
2295 provider_model,
2296 provider_timeout,
2297 &args.mode,
2298 ),
2299 EnrichOperation::RelationReclassify => call_relation_reclassify(
2300 &conn,
2301 &namespace,
2302 &item_key,
2303 provider_binary
2304 .as_deref()
2305 .expect("provider binary required"),
2306 provider_model,
2307 provider_timeout,
2308 &args.mode,
2309 ),
2310 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2311 call_entity_connect(
2312 &conn,
2313 &namespace,
2314 &item_key,
2315 provider_binary
2316 .as_deref()
2317 .expect("provider binary required"),
2318 provider_model,
2319 provider_timeout,
2320 &args.mode,
2321 )
2322 }
2323 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2324 &conn,
2325 &namespace,
2326 &item_key,
2327 provider_binary
2328 .as_deref()
2329 .expect("provider binary required"),
2330 provider_model,
2331 provider_timeout,
2332 &args.mode,
2333 ),
2334 EnrichOperation::DescriptionEnrich => call_description_enrich(
2335 &conn,
2336 &namespace,
2337 &item_key,
2338 provider_binary
2339 .as_deref()
2340 .expect("provider binary required"),
2341 provider_model,
2342 provider_timeout,
2343 &args.mode,
2344 ),
2345 EnrichOperation::DomainClassify => call_domain_classify(
2346 &conn,
2347 &namespace,
2348 &item_key,
2349 provider_binary
2350 .as_deref()
2351 .expect("provider binary required"),
2352 provider_model,
2353 provider_timeout,
2354 &args.mode,
2355 ),
2356 EnrichOperation::GraphAudit => call_graph_audit(
2357 &conn,
2358 &namespace,
2359 &item_key,
2360 provider_binary
2361 .as_deref()
2362 .expect("provider binary required"),
2363 provider_model,
2364 provider_timeout,
2365 &args.mode,
2366 ),
2367 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2368 &conn,
2369 &namespace,
2370 &item_key,
2371 provider_binary
2372 .as_deref()
2373 .expect("provider binary required"),
2374 provider_model,
2375 provider_timeout,
2376 &args.mode,
2377 ),
2378 EnrichOperation::BodyExtract => call_body_extract(
2379 &conn,
2380 &namespace,
2381 &item_key,
2382 provider_binary
2383 .as_deref()
2384 .expect("provider binary required"),
2385 provider_model,
2386 provider_timeout,
2387 &args.mode,
2388 ),
2389 };
2390
2391 match call_result {
2392 Ok(EnrichItemResult::Done {
2393 memory_id,
2394 entity_id,
2395 entities,
2396 rels,
2397 chars_before,
2398 chars_after,
2399 cost,
2400 is_oauth,
2401 }) => {
2402 if is_oauth && !oauth_detected {
2403 oauth_detected = true;
2404 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2405 }
2406 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2407
2408 let persist_err: Option<String> = match args.operation {
2410 EnrichOperation::MemoryBindings => {
2411 None
2413 }
2414 EnrichOperation::EntityDescriptions => {
2415 None
2417 }
2418 EnrichOperation::BodyEnrich => {
2419 None
2421 }
2422 _ => {
2423 None
2425 }
2426 };
2427
2428 if let Err(e) = queue_conn.execute(
2429 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2430 rusqlite::params![
2431 memory_id,
2432 entity_id,
2433 entities as i64,
2434 rels as i64,
2435 cost,
2436 item_started.elapsed().as_millis() as i64,
2437 queue_id
2438 ],
2439 ) {
2440 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2441 }
2442
2443 if persist_err.is_none() {
2444 completed += 1;
2445 if !is_oauth {
2446 cost_total += cost;
2447 }
2448 emit_json(&ItemEvent {
2449 item: &item_key,
2450 status: "done",
2451 memory_id,
2452 entity_id,
2453 entities: Some(entities),
2454 rels: Some(rels),
2455 chars_before,
2456 chars_after,
2457 cost_usd: if is_oauth { None } else { Some(cost) },
2458 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2459 error: None,
2460 index: current_index,
2461 total,
2462 });
2463 } else {
2464 failed += 1;
2465 emit_json(&ItemEvent {
2466 item: &item_key,
2467 status: "failed",
2468 memory_id: None,
2469 entity_id: None,
2470 entities: None,
2471 rels: None,
2472 chars_before: None,
2473 chars_after: None,
2474 cost_usd: None,
2475 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2476 error: persist_err,
2477 index: current_index,
2478 total,
2479 });
2480 }
2481 }
2482 Ok(EnrichItemResult::Skipped { reason }) => {
2483 skipped += 1;
2484 if let Err(e) = queue_conn.execute(
2485 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2486 rusqlite::params![reason, queue_id],
2487 ) {
2488 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2489 }
2490 emit_json(&ItemEvent {
2491 item: &item_key,
2492 status: "skipped",
2493 memory_id: None,
2494 entity_id: None,
2495 entities: None,
2496 rels: None,
2497 chars_before: None,
2498 chars_after: None,
2499 cost_usd: None,
2500 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2501 error: None,
2502 index: current_index,
2503 total,
2504 });
2505 }
2506 Ok(EnrichItemResult::PreservationFailed {
2507 score,
2508 threshold,
2509 chars_before,
2510 chars_after,
2511 }) => {
2512 skipped += 1;
2519 let reason = format!(
2520 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2521 );
2522 if let Err(qe) = queue_conn.execute(
2523 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2524 rusqlite::params![reason, queue_id],
2525 ) {
2526 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2527 }
2528 emit_json(&ItemEvent {
2529 item: &item_key,
2530 status: "preservation_failed",
2531 memory_id: None,
2532 entity_id: None,
2533 entities: None,
2534 rels: None,
2535 chars_before: Some(chars_before),
2536 chars_after: Some(chars_after),
2537 cost_usd: None,
2538 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2539 error: Some(reason),
2540 index: current_index,
2541 total,
2542 });
2543 }
2544 Err(e) => {
2545 let err_str = format!("{e}");
2546 if matches!(e, AppError::RateLimited { .. }) {
2547 if crate::retry::is_kill_switch_active() {
2548 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2549 } else if std::time::Instant::now() >= rate_limit_deadline {
2550 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2551 } else {
2552 let half = backoff_secs / 2;
2553 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2554 let actual_wait = half + jitter;
2555 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2556 if let Err(qe) = queue_conn.execute(
2557 "UPDATE queue SET status='pending' WHERE id=?1",
2558 rusqlite::params![queue_id],
2559 ) {
2560 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2561 }
2562 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2563 backoff_secs = (backoff_secs * 2).min(900);
2564 continue;
2565 }
2566 }
2567
2568 failed += 1;
2569 if let Err(qe) = queue_conn.execute(
2570 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2571 rusqlite::params![err_str, queue_id],
2572 ) {
2573 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2574 }
2575 emit_json(&ItemEvent {
2576 item: &item_key,
2577 status: "failed",
2578 memory_id: None,
2579 entity_id: None,
2580 entities: None,
2581 rels: None,
2582 chars_before: None,
2583 chars_after: None,
2584 cost_usd: None,
2585 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2586 error: Some(err_str),
2587 index: current_index,
2588 total,
2589 });
2590 }
2591 }
2592
2593 let _ = item_type; }
2595 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2598 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2599
2600 emit_json(&EnrichSummary {
2601 summary: true,
2602 operation: format!("{:?}", args.operation),
2603 items_total: total,
2604 completed,
2605 failed,
2606 skipped,
2607 cost_usd: cost_total,
2608 elapsed_ms: started.elapsed().as_millis() as u64,
2609 backend_invoked: take_enrich_backend(),
2610 });
2611
2612 if failed == 0 {
2613 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2614 }
2615
2616 Ok(())
2617}
2618
2619enum EnrichItemResult {
2624 Done {
2625 memory_id: Option<i64>,
2626 entity_id: Option<i64>,
2627 entities: usize,
2628 rels: usize,
2629 chars_before: Option<usize>,
2630 chars_after: Option<usize>,
2631 cost: f64,
2632 is_oauth: bool,
2633 },
2634 Skipped {
2635 reason: String,
2636 },
2637 PreservationFailed {
2642 score: f64,
2643 threshold: f64,
2644 chars_before: usize,
2645 chars_after: usize,
2646 },
2647}
2648
2649fn call_memory_bindings(
2654 conn: &Connection,
2655 namespace: &str,
2656 memory_name: &str,
2657 binary: &Path,
2658 model: Option<&str>,
2659 timeout: u64,
2660 mode: &EnrichMode,
2661) -> Result<EnrichItemResult, AppError> {
2662 let (memory_id, body): (i64, String) = conn.query_row(
2664 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2665 rusqlite::params![namespace, memory_name],
2666 |r| Ok((r.get(0)?, r.get(1)?)),
2667 ).map_err(|e| match e {
2668 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2669 other => AppError::Database(other),
2670 })?;
2671
2672 if body.trim().is_empty() {
2673 return Ok(EnrichItemResult::Skipped {
2674 reason: "body is empty".to_string(),
2675 });
2676 }
2677
2678 let (value, cost, is_oauth) = match mode {
2679 EnrichMode::ClaudeCode => call_claude(
2680 binary,
2681 BINDINGS_PROMPT,
2682 BINDINGS_SCHEMA,
2683 &body,
2684 model,
2685 timeout,
2686 )?,
2687 EnrichMode::Codex => call_codex(
2688 binary,
2689 BINDINGS_PROMPT,
2690 BINDINGS_SCHEMA,
2691 &body,
2692 model,
2693 timeout,
2694 )?,
2695 EnrichMode::Opencode => call_opencode(
2696 binary,
2697 BINDINGS_PROMPT,
2698 BINDINGS_SCHEMA,
2699 &body,
2700 model,
2701 timeout,
2702 )?,
2703 };
2704
2705 let empty_arr = serde_json::Value::Array(vec![]);
2706 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2707 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2708
2709 let (ent_count, rel_count) =
2710 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2711
2712 Ok(EnrichItemResult::Done {
2713 memory_id: Some(memory_id),
2714 entity_id: None,
2715 entities: ent_count,
2716 rels: rel_count,
2717 chars_before: None,
2718 chars_after: None,
2719 cost,
2720 is_oauth,
2721 })
2722}
2723
2724fn call_entity_description(
2725 conn: &Connection,
2726 namespace: &str,
2727 entity_name: &str,
2728 binary: &Path,
2729 model: Option<&str>,
2730 timeout: u64,
2731 mode: &EnrichMode,
2732) -> Result<EnrichItemResult, AppError> {
2733 let (entity_id, entity_type): (i64, String) = conn
2734 .query_row(
2735 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2736 rusqlite::params![namespace, entity_name],
2737 |r| Ok((r.get(0)?, r.get(1)?)),
2738 )
2739 .map_err(|e| match e {
2740 rusqlite::Error::QueryReturnedNoRows => {
2741 AppError::NotFound(format!("entity '{entity_name}' not found"))
2742 }
2743 other => AppError::Database(other),
2744 })?;
2745
2746 let prompt = format!(
2747 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2748 );
2749
2750 let (value, cost, is_oauth) = match mode {
2751 EnrichMode::ClaudeCode => call_claude(
2752 binary,
2753 &prompt,
2754 ENTITY_DESCRIPTION_SCHEMA,
2755 "",
2756 model,
2757 timeout,
2758 )?,
2759 EnrichMode::Codex => call_codex(
2760 binary,
2761 &prompt,
2762 ENTITY_DESCRIPTION_SCHEMA,
2763 "",
2764 model,
2765 timeout,
2766 )?,
2767 EnrichMode::Opencode => call_opencode(
2768 binary,
2769 &prompt,
2770 ENTITY_DESCRIPTION_SCHEMA,
2771 "",
2772 model,
2773 timeout,
2774 )?,
2775 };
2776
2777 let description = value
2778 .get("description")
2779 .and_then(|v| v.as_str())
2780 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2781
2782 persist_entity_description(conn, entity_id, description)?;
2783
2784 Ok(EnrichItemResult::Done {
2785 memory_id: None,
2786 entity_id: Some(entity_id),
2787 entities: 0,
2788 rels: 0,
2789 chars_before: None,
2790 chars_after: None,
2791 cost,
2792 is_oauth,
2793 })
2794}
2795
2796#[allow(clippy::too_many_arguments)]
2797fn call_body_enrich(
2798 conn: &Connection,
2799 namespace: &str,
2800 memory_name: &str,
2801 binary: &Path,
2802 model: Option<&str>,
2803 timeout: u64,
2804 mode: &EnrichMode,
2805 min_output_chars: usize,
2806 max_output_chars: usize,
2807 prompt_template: Option<&Path>,
2808 preserve_threshold: f64,
2809 paths: &crate::paths::AppPaths,
2810 llm_backend: crate::cli::LlmBackendChoice,
2811) -> Result<EnrichItemResult, AppError> {
2812 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2813 .query_row(
2814 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2815 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2816 rusqlite::params![namespace, memory_name],
2817 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2818 )
2819 .map_err(|e| match e {
2820 rusqlite::Error::QueryReturnedNoRows => {
2821 AppError::NotFound(format!("memory '{memory_name}' not found"))
2822 }
2823 other => AppError::Database(other),
2824 })?;
2825
2826 let chars_before = body.chars().count();
2827
2828 let linked_entities: Vec<String> = {
2830 let mut stmt = conn.prepare_cached(
2831 "SELECT e.name FROM memory_entities me \
2832 JOIN entities e ON e.id = me.entity_id \
2833 WHERE me.memory_id = ?1 LIMIT 10",
2834 )?;
2835 let result: Vec<String> = stmt
2836 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2837 .filter_map(|r| r.ok())
2838 .collect();
2839 drop(stmt);
2840 result
2841 };
2842
2843 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2845 let file_size = std::fs::metadata(tmpl_path)
2846 .map_err(|e| {
2847 AppError::Io(std::io::Error::new(
2848 e.kind(),
2849 format!("failed to stat prompt template: {e}"),
2850 ))
2851 })?
2852 .len();
2853 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2854 return Err(AppError::LimitExceeded(
2855 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2856 ));
2857 }
2858 std::fs::read_to_string(tmpl_path).map_err(|e| {
2859 AppError::Io(std::io::Error::new(
2860 e.kind(),
2861 format!("failed to read prompt template: {e}"),
2862 ))
2863 })?
2864 } else {
2865 BODY_ENRICH_PROMPT_PREFIX.to_string()
2866 };
2867
2868 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2870 let mut ctx = String::new();
2871 ctx.push_str(&format!(
2872 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2873 ));
2874 if !description.is_empty() {
2875 ctx.push_str(&format!("- Description: {description}\n"));
2876 }
2877 ctx.push_str(&format!("- Domain: {namespace}\n"));
2878 if !linked_entities.is_empty() {
2879 ctx.push_str(&format!(
2880 "- Linked entities: {}\n",
2881 linked_entities.join(", ")
2882 ));
2883 }
2884 ctx
2885 } else {
2886 String::new()
2887 };
2888
2889 let prompt = format!(
2890 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2891 );
2892
2893 let (value, cost, is_oauth) = match mode {
2895 EnrichMode::ClaudeCode => {
2896 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2897 }
2898 EnrichMode::Codex => {
2899 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2900 }
2901 EnrichMode::Opencode => {
2902 call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2903 }
2904 };
2905
2906 let enriched_body = value
2907 .get("enriched_body")
2908 .and_then(|v| v.as_str())
2909 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2910
2911 let chars_after = enriched_body.chars().count();
2912
2913 let threshold = preserve_threshold;
2920 let verdict =
2921 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2922 if !verdict.is_accepted() {
2923 return Ok(EnrichItemResult::PreservationFailed {
2924 score: match verdict {
2925 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2926 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2927 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2928 },
2929 threshold,
2930 chars_before,
2931 chars_after,
2932 });
2933 }
2934
2935 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2941 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2942 if old_hash == new_hash {
2943 return Ok(EnrichItemResult::Skipped {
2944 reason: format!(
2945 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2946 ),
2947 });
2948 }
2949
2950 if chars_after <= chars_before {
2952 return Ok(EnrichItemResult::Skipped {
2953 reason: format!(
2954 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2955 ),
2956 });
2957 }
2958
2959 persist_enriched_body(
2960 conn,
2961 namespace,
2962 memory_id,
2963 memory_name,
2964 enriched_body,
2965 paths,
2966 llm_backend,
2967 )?;
2968
2969 Ok(EnrichItemResult::Done {
2970 memory_id: Some(memory_id),
2971 entity_id: None,
2972 entities: 0,
2973 rels: 0,
2974 chars_before: Some(chars_before),
2975 chars_after: Some(chars_after),
2976 cost,
2977 is_oauth,
2978 })
2979}
2980
2981fn call_reembed(
2982 conn: &Connection,
2983 namespace: &str,
2984 memory_name: &str,
2985 paths: &crate::paths::AppPaths,
2986 llm_backend: crate::cli::LlmBackendChoice,
2987) -> Result<EnrichItemResult, AppError> {
2988 let (memory_id, body, memory_type): (i64, String, String) = conn
2989 .query_row(
2990 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2991 FROM memories
2992 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2993 rusqlite::params![namespace, memory_name],
2994 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2995 )
2996 .map_err(|e| match e {
2997 rusqlite::Error::QueryReturnedNoRows => {
2998 AppError::NotFound(format!("memory '{memory_name}' not found"))
2999 }
3000 other => AppError::Database(other),
3001 })?;
3002
3003 if body.trim().is_empty() {
3004 return Ok(EnrichItemResult::Skipped {
3005 reason: "body is empty".to_string(),
3006 });
3007 }
3008
3009 reembed_memory_vector(
3010 conn,
3011 namespace,
3012 memory_id,
3013 memory_name,
3014 &memory_type,
3015 &body,
3016 paths,
3017 llm_backend,
3018 )?;
3019
3020 Ok(EnrichItemResult::Done {
3021 memory_id: Some(memory_id),
3022 entity_id: None,
3023 entities: 0,
3024 rels: 0,
3025 chars_before: Some(body.chars().count()),
3026 chars_after: Some(body.chars().count()),
3027 cost: 0.0,
3028 is_oauth: true,
3029 })
3030}
3031
3032fn scan_operation(
3037 conn: &Connection,
3038 namespace: &str,
3039 args: &EnrichArgs,
3040) -> Result<Vec<String>, AppError> {
3041 let name_filter = resolve_name_filter(args)?;
3043 match args.operation {
3044 EnrichOperation::MemoryBindings => {
3045 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3046 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3047 }
3048 EnrichOperation::EntityDescriptions => {
3049 let rows =
3050 scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3051 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3052 }
3053 EnrichOperation::BodyEnrich => {
3054 let rows = scan_short_body_memories(
3055 conn,
3056 namespace,
3057 args.min_output_chars,
3058 args.limit,
3059 &name_filter,
3060 )?;
3061 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3062 }
3063 EnrichOperation::ReEmbed => {
3064 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3065 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3066 }
3067 EnrichOperation::WeightCalibrate => {
3068 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3069 Ok(rows
3070 .into_iter()
3071 .map(|(id, _, _, _, _)| id.to_string())
3072 .collect())
3073 }
3074 EnrichOperation::RelationReclassify => {
3075 let rows = scan_generic_relations(conn, namespace, args.limit)?;
3076 Ok(rows
3077 .into_iter()
3078 .map(|(id, _, _, _)| id.to_string())
3079 .collect())
3080 }
3081 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3082 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3083 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3084 }
3085 EnrichOperation::EntityTypeValidate => {
3086 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3087 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3088 }
3089 EnrichOperation::DescriptionEnrich => {
3090 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3091 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3092 }
3093 EnrichOperation::DomainClassify
3094 | EnrichOperation::GraphAudit
3095 | EnrichOperation::DeepResearchSynth
3096 | EnrichOperation::BodyExtract => {
3097 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3098 let sql = format!(
3099 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3100 );
3101 let mut stmt = conn.prepare(&sql)?;
3102 let names = stmt
3103 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3104 .collect::<Result<Vec<_>, _>>()?;
3105 Ok(names)
3106 }
3107 }
3108}
3109
3110fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3116 if let Some(p) = explicit {
3117 if p.exists() {
3118 return Ok(p.to_path_buf());
3119 }
3120 return Err(AppError::Validation(format!(
3121 "Codex binary not found at explicit path: {}",
3122 p.display()
3123 )));
3124 }
3125
3126 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3127 let p = PathBuf::from(&env_path);
3128 if p.exists() {
3129 return Ok(p);
3130 }
3131 }
3132
3133 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3134 if let Some(path_var) = std::env::var_os("PATH") {
3135 for dir in std::env::split_paths(&path_var) {
3136 let candidate = dir.join(name);
3137 if candidate.exists() {
3138 return Ok(crate::extract::llm_embedding::resolve_real_binary(
3139 &candidate,
3140 ));
3141 }
3142 }
3143 }
3144
3145 Err(AppError::Validation(
3146 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3147 ))
3148}
3149
3150fn call_weight_calibrate(
3152 conn: &Connection,
3153 _namespace: &str,
3154 item_key: &str,
3155 binary: &Path,
3156 model: Option<&str>,
3157 timeout: u64,
3158 mode: &EnrichMode,
3159) -> Result<EnrichItemResult, AppError> {
3160 let rel_id: i64 = item_key
3161 .parse()
3162 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3163 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3164 .query_row(
3165 "SELECT e1.name, e2.name, r.relation, r.weight \
3166 FROM relationships r \
3167 JOIN entities e1 ON e1.id = r.source_id \
3168 JOIN entities e2 ON e2.id = r.target_id \
3169 WHERE r.id = ?1",
3170 rusqlite::params![rel_id],
3171 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3172 )
3173 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3174
3175 let input_text = format!(
3176 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3177 );
3178 let (value, cost, is_oauth) = match mode {
3179 EnrichMode::ClaudeCode => call_claude(
3180 binary,
3181 WEIGHT_CALIBRATE_PROMPT,
3182 WEIGHT_CALIBRATE_SCHEMA,
3183 &input_text,
3184 model,
3185 timeout,
3186 )?,
3187 EnrichMode::Codex => call_codex(
3188 binary,
3189 WEIGHT_CALIBRATE_PROMPT,
3190 WEIGHT_CALIBRATE_SCHEMA,
3191 &input_text,
3192 model,
3193 timeout,
3194 )?,
3195 EnrichMode::Opencode => call_opencode(
3196 binary,
3197 WEIGHT_CALIBRATE_PROMPT,
3198 WEIGHT_CALIBRATE_SCHEMA,
3199 &input_text,
3200 model,
3201 timeout,
3202 )?,
3203 };
3204
3205 let calibrated = value
3206 .get("calibrated_weight")
3207 .and_then(|v| v.as_f64())
3208 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3209
3210 conn.execute(
3211 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3212 rusqlite::params![calibrated, rel_id],
3213 )?;
3214
3215 Ok(EnrichItemResult::Done {
3216 memory_id: None,
3217 entity_id: None,
3218 entities: 0,
3219 rels: 1,
3220 chars_before: None,
3221 chars_after: None,
3222 cost,
3223 is_oauth,
3224 })
3225}
3226
3227fn call_relation_reclassify(
3229 conn: &Connection,
3230 _namespace: &str,
3231 item_key: &str,
3232 binary: &Path,
3233 model: Option<&str>,
3234 timeout: u64,
3235 mode: &EnrichMode,
3236) -> Result<EnrichItemResult, AppError> {
3237 let rel_id: i64 = item_key
3238 .parse()
3239 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3240 let (source_name, target_name, current_relation): (String, String, String) = conn
3241 .query_row(
3242 "SELECT e1.name, e2.name, r.relation \
3243 FROM relationships r \
3244 JOIN entities e1 ON e1.id = r.source_id \
3245 JOIN entities e2 ON e2.id = r.target_id \
3246 WHERE r.id = ?1",
3247 rusqlite::params![rel_id],
3248 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3249 )
3250 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3251
3252 let input_text = format!(
3253 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3254 );
3255 let (value, cost, is_oauth) = match mode {
3256 EnrichMode::ClaudeCode => call_claude(
3257 binary,
3258 RELATION_RECLASSIFY_PROMPT,
3259 RELATION_RECLASSIFY_SCHEMA,
3260 &input_text,
3261 model,
3262 timeout,
3263 )?,
3264 EnrichMode::Codex => call_codex(
3265 binary,
3266 RELATION_RECLASSIFY_PROMPT,
3267 RELATION_RECLASSIFY_SCHEMA,
3268 &input_text,
3269 model,
3270 timeout,
3271 )?,
3272 EnrichMode::Opencode => call_opencode(
3273 binary,
3274 RELATION_RECLASSIFY_PROMPT,
3275 RELATION_RECLASSIFY_SCHEMA,
3276 &input_text,
3277 model,
3278 timeout,
3279 )?,
3280 };
3281
3282 let new_relation = value
3283 .get("relation")
3284 .and_then(|v| v.as_str())
3285 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3286 let new_strength = value
3287 .get("strength")
3288 .and_then(|v| v.as_f64())
3289 .unwrap_or(0.5);
3290
3291 conn.execute(
3292 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3293 rusqlite::params![new_relation, new_strength, rel_id],
3294 )?;
3295
3296 Ok(EnrichItemResult::Done {
3297 memory_id: None,
3298 entity_id: None,
3299 entities: 0,
3300 rels: 1,
3301 chars_before: None,
3302 chars_after: None,
3303 cost,
3304 is_oauth,
3305 })
3306}
3307
3308fn call_entity_connect(
3310 conn: &Connection,
3311 namespace: &str,
3312 item_key: &str,
3313 binary: &Path,
3314 model: Option<&str>,
3315 timeout: u64,
3316 mode: &EnrichMode,
3317) -> Result<EnrichItemResult, AppError> {
3318 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3319 let (e1_id, e1_name, e2_id, e2_name) =
3320 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3321 Some(p) => p,
3322 None => {
3323 return Ok(EnrichItemResult::Skipped {
3324 reason: "pair no longer isolated".into(),
3325 })
3326 }
3327 };
3328 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3329 let (value, cost, is_oauth) = match mode {
3330 EnrichMode::ClaudeCode => call_claude(
3331 binary,
3332 ENTITY_CONNECT_PROMPT,
3333 ENTITY_CONNECT_SCHEMA,
3334 &input_text,
3335 model,
3336 timeout,
3337 )?,
3338 EnrichMode::Codex => call_codex(
3339 binary,
3340 ENTITY_CONNECT_PROMPT,
3341 ENTITY_CONNECT_SCHEMA,
3342 &input_text,
3343 model,
3344 timeout,
3345 )?,
3346 EnrichMode::Opencode => call_opencode(
3347 binary,
3348 ENTITY_CONNECT_PROMPT,
3349 ENTITY_CONNECT_SCHEMA,
3350 &input_text,
3351 model,
3352 timeout,
3353 )?,
3354 };
3355 let relation = value
3356 .get("relation")
3357 .and_then(|v| v.as_str())
3358 .unwrap_or("none");
3359 if relation == "none" {
3360 return Ok(EnrichItemResult::Skipped {
3361 reason: "LLM determined no relationship".into(),
3362 });
3363 }
3364 let strength = value
3365 .get("strength")
3366 .and_then(|v| v.as_f64())
3367 .unwrap_or(0.5);
3368 conn.execute(
3369 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3370 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3371 )?;
3372 Ok(EnrichItemResult::Done {
3373 memory_id: None,
3374 entity_id: None,
3375 entities: 0,
3376 rels: 1,
3377 chars_before: None,
3378 chars_after: None,
3379 cost,
3380 is_oauth,
3381 })
3382}
3383
3384fn call_entity_type_validate(
3386 conn: &Connection,
3387 _namespace: &str,
3388 item_key: &str,
3389 binary: &Path,
3390 model: Option<&str>,
3391 timeout: u64,
3392 mode: &EnrichMode,
3393) -> Result<EnrichItemResult, AppError> {
3394 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3395 .query_row(
3396 "SELECT id, name, type FROM entities WHERE name = ?1",
3397 rusqlite::params![item_key],
3398 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3399 )
3400 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3401 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3402 let (value, cost, is_oauth) = match mode {
3403 EnrichMode::ClaudeCode => call_claude(
3404 binary,
3405 ENTITY_TYPE_VALIDATE_PROMPT,
3406 ENTITY_TYPE_VALIDATE_SCHEMA,
3407 &input_text,
3408 model,
3409 timeout,
3410 )?,
3411 EnrichMode::Codex => call_codex(
3412 binary,
3413 ENTITY_TYPE_VALIDATE_PROMPT,
3414 ENTITY_TYPE_VALIDATE_SCHEMA,
3415 &input_text,
3416 model,
3417 timeout,
3418 )?,
3419 EnrichMode::Opencode => call_opencode(
3420 binary,
3421 ENTITY_TYPE_VALIDATE_PROMPT,
3422 ENTITY_TYPE_VALIDATE_SCHEMA,
3423 &input_text,
3424 model,
3425 timeout,
3426 )?,
3427 };
3428 let validated_type = value
3429 .get("validated_type")
3430 .and_then(|v| v.as_str())
3431 .unwrap_or(&ent_type);
3432 let was_correct = value
3433 .get("was_correct")
3434 .and_then(|v| v.as_bool())
3435 .unwrap_or(true);
3436 if !was_correct {
3437 conn.execute(
3438 "UPDATE entities SET type = ?1 WHERE id = ?2",
3439 rusqlite::params![validated_type, ent_id],
3440 )?;
3441 }
3442 Ok(EnrichItemResult::Done {
3443 memory_id: None,
3444 entity_id: Some(ent_id),
3445 entities: 1,
3446 rels: 0,
3447 chars_before: None,
3448 chars_after: None,
3449 cost,
3450 is_oauth,
3451 })
3452}
3453
3454fn call_description_enrich(
3456 conn: &Connection,
3457 _namespace: &str,
3458 item_key: &str,
3459 binary: &Path,
3460 model: Option<&str>,
3461 timeout: u64,
3462 mode: &EnrichMode,
3463) -> Result<EnrichItemResult, AppError> {
3464 let (mem_id, body, old_desc): (i64, String, String) = conn
3465 .query_row(
3466 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3467 rusqlite::params![item_key],
3468 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3469 )
3470 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3471 let snippet: String = body.chars().take(500).collect();
3472 let input_text = format!(
3473 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3474 );
3475 let (value, cost, is_oauth) = match mode {
3476 EnrichMode::ClaudeCode => call_claude(
3477 binary,
3478 DESCRIPTION_ENRICH_PROMPT,
3479 DESCRIPTION_ENRICH_SCHEMA,
3480 &input_text,
3481 model,
3482 timeout,
3483 )?,
3484 EnrichMode::Codex => call_codex(
3485 binary,
3486 DESCRIPTION_ENRICH_PROMPT,
3487 DESCRIPTION_ENRICH_SCHEMA,
3488 &input_text,
3489 model,
3490 timeout,
3491 )?,
3492 EnrichMode::Opencode => call_opencode(
3493 binary,
3494 DESCRIPTION_ENRICH_PROMPT,
3495 DESCRIPTION_ENRICH_SCHEMA,
3496 &input_text,
3497 model,
3498 timeout,
3499 )?,
3500 };
3501 let new_desc = value
3502 .get("description")
3503 .and_then(|v| v.as_str())
3504 .unwrap_or(&old_desc);
3505 let old_name: String = conn.query_row(
3506 "SELECT name FROM memories WHERE id = ?1",
3507 rusqlite::params![mem_id],
3508 |r| r.get(0),
3509 )?;
3510 conn.execute(
3511 "UPDATE memories SET description = ?1 WHERE id = ?2",
3512 rusqlite::params![new_desc, mem_id],
3513 )?;
3514 memories::sync_fts_after_update(
3515 conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3516 )?;
3517 Ok(EnrichItemResult::Done {
3518 memory_id: Some(mem_id),
3519 entity_id: None,
3520 entities: 0,
3521 rels: 0,
3522 chars_before: Some(old_desc.len()),
3523 chars_after: Some(new_desc.len()),
3524 cost,
3525 is_oauth,
3526 })
3527}
3528
3529fn call_domain_classify(
3531 conn: &Connection,
3532 _namespace: &str,
3533 item_key: &str,
3534 binary: &Path,
3535 model: Option<&str>,
3536 timeout: u64,
3537 mode: &EnrichMode,
3538) -> Result<EnrichItemResult, AppError> {
3539 let (mem_id, body, desc): (i64, String, String) = conn
3540 .query_row(
3541 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3542 rusqlite::params![item_key],
3543 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3544 )
3545 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3546 let snippet: String = body.chars().take(500).collect();
3547 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3548 let (value, cost, is_oauth) = match mode {
3549 EnrichMode::ClaudeCode => call_claude(
3550 binary,
3551 DOMAIN_CLASSIFY_PROMPT,
3552 DOMAIN_CLASSIFY_SCHEMA,
3553 &input_text,
3554 model,
3555 timeout,
3556 )?,
3557 EnrichMode::Codex => call_codex(
3558 binary,
3559 DOMAIN_CLASSIFY_PROMPT,
3560 DOMAIN_CLASSIFY_SCHEMA,
3561 &input_text,
3562 model,
3563 timeout,
3564 )?,
3565 EnrichMode::Opencode => call_opencode(
3566 binary,
3567 DOMAIN_CLASSIFY_PROMPT,
3568 DOMAIN_CLASSIFY_SCHEMA,
3569 &input_text,
3570 model,
3571 timeout,
3572 )?,
3573 };
3574 let domain = value
3575 .get("domain")
3576 .and_then(|v| v.as_str())
3577 .unwrap_or("uncategorized");
3578 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3579 conn.execute(
3580 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3581 rusqlite::params![metadata, mem_id],
3582 )?;
3583 Ok(EnrichItemResult::Done {
3584 memory_id: Some(mem_id),
3585 entity_id: None,
3586 entities: 0,
3587 rels: 0,
3588 chars_before: None,
3589 chars_after: None,
3590 cost,
3591 is_oauth,
3592 })
3593}
3594
3595fn call_graph_audit(
3597 conn: &Connection,
3598 _namespace: &str,
3599 item_key: &str,
3600 binary: &Path,
3601 model: Option<&str>,
3602 timeout: u64,
3603 mode: &EnrichMode,
3604) -> Result<EnrichItemResult, AppError> {
3605 let (mem_id, body, desc): (i64, String, String) = conn
3606 .query_row(
3607 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3608 rusqlite::params![item_key],
3609 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3610 )
3611 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3612 let snippet: String = body.chars().take(500).collect();
3613 let ent_count: i64 = conn
3614 .query_row(
3615 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3616 rusqlite::params![mem_id],
3617 |r| r.get(0),
3618 )
3619 .unwrap_or(0);
3620 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3621 let (value, cost, is_oauth) = match mode {
3622 EnrichMode::ClaudeCode => call_claude(
3623 binary,
3624 GRAPH_AUDIT_PROMPT,
3625 GRAPH_AUDIT_SCHEMA,
3626 &input_text,
3627 model,
3628 timeout,
3629 )?,
3630 EnrichMode::Codex => call_codex(
3631 binary,
3632 GRAPH_AUDIT_PROMPT,
3633 GRAPH_AUDIT_SCHEMA,
3634 &input_text,
3635 model,
3636 timeout,
3637 )?,
3638 EnrichMode::Opencode => call_opencode(
3639 binary,
3640 GRAPH_AUDIT_PROMPT,
3641 GRAPH_AUDIT_SCHEMA,
3642 &input_text,
3643 model,
3644 timeout,
3645 )?,
3646 };
3647 let issues = value
3648 .get("issues")
3649 .and_then(|v| v.as_array())
3650 .map(|a| a.len())
3651 .unwrap_or(0);
3652 Ok(EnrichItemResult::Done {
3653 memory_id: Some(mem_id),
3654 entity_id: None,
3655 entities: 0,
3656 rels: issues,
3657 chars_before: None,
3658 chars_after: None,
3659 cost,
3660 is_oauth,
3661 })
3662}
3663
3664fn call_deep_research_synth(
3666 conn: &Connection,
3667 namespace: &str,
3668 item_key: &str,
3669 binary: &Path,
3670 model: Option<&str>,
3671 timeout: u64,
3672 mode: &EnrichMode,
3673) -> Result<EnrichItemResult, AppError> {
3674 let (mem_id, body): (i64, String) = conn
3675 .query_row(
3676 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3677 rusqlite::params![item_key],
3678 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3679 )
3680 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3681 let snippet: String = body.chars().take(2000).collect();
3682 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3683 let (value, cost, is_oauth) = match mode {
3684 EnrichMode::ClaudeCode => call_claude(
3685 binary,
3686 DEEP_RESEARCH_SYNTH_PROMPT,
3687 DEEP_RESEARCH_SYNTH_SCHEMA,
3688 &input_text,
3689 model,
3690 timeout,
3691 )?,
3692 EnrichMode::Codex => call_codex(
3693 binary,
3694 DEEP_RESEARCH_SYNTH_PROMPT,
3695 DEEP_RESEARCH_SYNTH_SCHEMA,
3696 &input_text,
3697 model,
3698 timeout,
3699 )?,
3700 EnrichMode::Opencode => call_opencode(
3701 binary,
3702 DEEP_RESEARCH_SYNTH_PROMPT,
3703 DEEP_RESEARCH_SYNTH_SCHEMA,
3704 &input_text,
3705 model,
3706 timeout,
3707 )?,
3708 };
3709 let mut ent_count = 0usize;
3710 let mut rel_count = 0usize;
3711 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3712 for e in ents {
3713 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3714 let etype_str = e
3715 .get("entity_type")
3716 .and_then(|v| v.as_str())
3717 .unwrap_or("concept");
3718 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3719 if name.len() >= 2 {
3720 let ne = NewEntity {
3721 name: name.to_string(),
3722 entity_type: etype,
3723 description: None,
3724 };
3725 let _ = entities::upsert_entity(conn, namespace, &ne);
3726 ent_count += 1;
3727 }
3728 }
3729 }
3730 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3731 for r in rels {
3732 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3733 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3734 if src.is_empty() || tgt.is_empty() {
3735 continue;
3736 }
3737 let rel = r
3738 .get("relation")
3739 .and_then(|v| v.as_str())
3740 .unwrap_or("related");
3741 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3742 if let (Some(sid), Some(tid)) = (
3743 entities::find_entity_id(conn, namespace, src)?,
3744 entities::find_entity_id(conn, namespace, tgt)?,
3745 ) {
3746 let _ = entities::create_or_fetch_relationship(
3747 conn, namespace, sid, tid, rel, str_, None,
3748 );
3749 rel_count += 1;
3750 }
3751 }
3752 }
3753 Ok(EnrichItemResult::Done {
3754 memory_id: Some(mem_id),
3755 entity_id: None,
3756 entities: ent_count,
3757 rels: rel_count,
3758 chars_before: None,
3759 chars_after: None,
3760 cost,
3761 is_oauth,
3762 })
3763}
3764
3765fn call_body_extract(
3767 conn: &Connection,
3768 _namespace: &str,
3769 item_key: &str,
3770 binary: &Path,
3771 model: Option<&str>,
3772 timeout: u64,
3773 mode: &EnrichMode,
3774) -> Result<EnrichItemResult, AppError> {
3775 let (mem_id, body, old_desc): (i64, String, String) = conn
3776 .query_row(
3777 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3778 rusqlite::params![item_key],
3779 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3780 )
3781 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3782 let old_name: String = conn.query_row(
3783 "SELECT name FROM memories WHERE id = ?1",
3784 rusqlite::params![mem_id],
3785 |r| r.get(0),
3786 )?;
3787 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3788 let (value, cost, is_oauth) = match mode {
3789 EnrichMode::ClaudeCode => call_claude(
3790 binary,
3791 BODY_EXTRACT_PROMPT,
3792 BODY_EXTRACT_SCHEMA,
3793 &input_text,
3794 model,
3795 timeout,
3796 )?,
3797 EnrichMode::Codex => call_codex(
3798 binary,
3799 BODY_EXTRACT_PROMPT,
3800 BODY_EXTRACT_SCHEMA,
3801 &input_text,
3802 model,
3803 timeout,
3804 )?,
3805 EnrichMode::Opencode => call_opencode(
3806 binary,
3807 BODY_EXTRACT_PROMPT,
3808 BODY_EXTRACT_SCHEMA,
3809 &input_text,
3810 model,
3811 timeout,
3812 )?,
3813 };
3814 let restructured = value
3815 .get("restructured_body")
3816 .and_then(|v| v.as_str())
3817 .unwrap_or(&body);
3818 let chars_before = body.len();
3819 let chars_after = restructured.len();
3820 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3821 conn.execute(
3822 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3823 rusqlite::params![restructured, new_hash, mem_id],
3824 )?;
3825 memories::sync_fts_after_update(
3826 conn,
3827 mem_id,
3828 &old_name,
3829 &old_desc,
3830 &body,
3831 &old_name,
3832 &old_desc,
3833 restructured,
3834 )?;
3835 Ok(EnrichItemResult::Done {
3836 memory_id: Some(mem_id),
3837 entity_id: None,
3838 entities: 0,
3839 rels: 0,
3840 chars_before: Some(chars_before),
3841 chars_after: Some(chars_after),
3842 cost,
3843 is_oauth,
3844 })
3845}
3846
3847#[allow(clippy::type_complexity)]
3849fn scan_isolated_entity_pairs(
3850 conn: &Connection,
3851 namespace: &str,
3852 limit: Option<usize>,
3853) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3854 let limit_val = limit.unwrap_or(50) as i64;
3855 let mut stmt = conn.prepare_cached(
3856 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3857 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3858 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3859 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3860 (r.source_id = e2.id AND r.target_id = e1.id)) \
3861 LIMIT ?2",
3862 )?;
3863 let rows = stmt
3864 .query_map(rusqlite::params![namespace, limit_val], |r| {
3865 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3866 })?
3867 .collect::<Result<Vec<_>, _>>()?;
3868 Ok(rows)
3869}
3870
3871fn scan_entities_for_type_validation(
3873 conn: &Connection,
3874 namespace: &str,
3875 limit: Option<usize>,
3876) -> Result<Vec<(i64, String, String)>, AppError> {
3877 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3878 let sql = format!(
3879 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3880 );
3881 let mut stmt = conn.prepare(&sql)?;
3882 let rows = stmt
3883 .query_map(rusqlite::params![namespace], |r| {
3884 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3885 })?
3886 .collect::<Result<Vec<_>, _>>()?;
3887 Ok(rows)
3888}
3889
3890fn scan_generic_descriptions(
3892 conn: &Connection,
3893 namespace: &str,
3894 limit: Option<usize>,
3895) -> Result<Vec<(i64, String, String)>, AppError> {
3896 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3897 let sql = format!(
3898 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3899 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3900 ORDER BY id {limit_clause}"
3901 );
3902 let mut stmt = conn.prepare(&sql)?;
3903 let rows = stmt
3904 .query_map(rusqlite::params![namespace], |r| {
3905 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3906 })?
3907 .collect::<Result<Vec<_>, _>>()?;
3908 Ok(rows)
3909}
3910
3911fn call_codex(
3915 binary: &Path,
3916 prompt: &str,
3917 json_schema: &str,
3918 input_text: &str,
3919 model: Option<&str>,
3920 timeout_secs: u64,
3921) -> Result<(serde_json::Value, f64, bool), AppError> {
3922 use wait_timeout::ChildExt;
3923
3924 super::codex_spawn::validate_codex_model(model)?;
3929 let schema_file = super::codex_spawn::trusted_schema_path()?;
3930
3931 let args = super::codex_spawn::CodexSpawnArgs {
3932 binary,
3933 prompt,
3934 json_schema,
3935 input_text,
3936 model,
3937 timeout_secs,
3938 schema_path: schema_file.clone(),
3939 };
3940 let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3941
3942 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3943 AppError::Io(std::io::Error::new(
3944 e.kind(),
3945 format!("failed to spawn codex: {e}"),
3946 ))
3947 })?;
3948
3949 let full_prompt = format!("{prompt}\n\n{input_text}");
3950 let stdin_bytes = full_prompt.into_bytes();
3951 let mut child_stdin = child
3952 .stdin
3953 .take()
3954 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3955 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3956 child_stdin.write_all(&stdin_bytes)?;
3957 drop(child_stdin);
3958 Ok(())
3959 });
3960
3961 let start = std::time::Instant::now();
3962 let timeout = std::time::Duration::from_secs(timeout_secs);
3963 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3964 let _ = std::fs::remove_file(&schema_file);
3965
3966 match status {
3967 Some(exit_status) => {
3968 stdin_thread
3969 .join()
3970 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3971 .map_err(AppError::Io)?;
3972
3973 tracing::debug!(
3974 target: "process",
3975 exit_code = ?exit_status.code(),
3976 elapsed_ms = start.elapsed().as_millis() as u64,
3977 "external process completed"
3978 );
3979
3980 let mut stdout_buf = Vec::new();
3981 if let Some(mut out) = child.stdout.take() {
3982 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3983 }
3984 if !exit_status.success() {
3985 let mut stderr_buf = Vec::new();
3986 if let Some(mut err) = child.stderr.take() {
3987 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3988 }
3989 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3990 tracing::warn!(
3991 target: "enrich",
3992 exit_code = ?exit_status.code(),
3993 stderr = %stderr_str.trim(),
3994 "codex process failed"
3995 );
3996 return Err(AppError::Validation(format!(
3997 "codex exited with code {:?}: {}",
3998 exit_status.code(),
3999 stderr_str.trim()
4000 )));
4001 }
4002 let stdout_str = String::from_utf8(stdout_buf)
4003 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4004 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4007 let value: serde_json::Value =
4013 serde_json::from_str(&result.last_agent_text).map_err(|e| {
4014 AppError::Validation(format!(
4015 "codex agent_message is not valid JSON: {e}; raw={}",
4016 result.last_agent_text
4017 ))
4018 })?;
4019 Ok((value, 0.0, false))
4020 }
4021 None => {
4022 let _ = child.kill();
4023 let _ = child.wait();
4024 let _ = stdin_thread.join();
4025 Err(AppError::Validation(format!(
4026 "codex timed out after {timeout_secs} seconds"
4027 )))
4028 }
4029 }
4030}
4031
4032fn call_opencode(
4033 binary: &Path,
4034 prompt: &str,
4035 json_schema: &str,
4036 input_text: &str,
4037 model: Option<&str>,
4038 timeout_secs: u64,
4039) -> Result<(serde_json::Value, f64, bool), AppError> {
4040 use wait_timeout::ChildExt;
4041
4042 let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4043
4044 let augmented_prompt = if json_schema.is_empty() {
4045 prompt.to_string()
4046 } else {
4047 format!(
4048 "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4049 The JSON MUST match this schema:\n{json_schema}"
4050 )
4051 };
4052
4053 let mut cmd = super::opencode_runner::build_opencode_command_sync(
4054 binary,
4055 &resolved_model,
4056 &augmented_prompt,
4057 input_text,
4058 )?;
4059
4060 let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4061 AppError::Io(std::io::Error::new(
4062 e.kind(),
4063 format!("failed to spawn opencode: {e}"),
4064 ))
4065 })?;
4066
4067 let start = std::time::Instant::now();
4068 let timeout = std::time::Duration::from_secs(timeout_secs);
4069 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4070
4071 match status {
4072 Some(exit_status) => {
4073 tracing::debug!(
4074 target: "process",
4075 exit_code = ?exit_status.code(),
4076 elapsed_ms = start.elapsed().as_millis() as u64,
4077 "opencode process completed"
4078 );
4079
4080 let mut stdout_buf = Vec::new();
4081 if let Some(mut out) = child.stdout.take() {
4082 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4083 }
4084 if !exit_status.success() {
4085 let mut stderr_buf = Vec::new();
4086 if let Some(mut err) = child.stderr.take() {
4087 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4088 }
4089 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4090 tracing::warn!(
4091 target: "enrich",
4092 exit_code = ?exit_status.code(),
4093 stderr = %stderr_str.trim(),
4094 "opencode process failed"
4095 );
4096 return Err(AppError::Validation(format!(
4097 "opencode exited with code {:?}: {}",
4098 exit_status.code(),
4099 stderr_str.trim()
4100 )));
4101 }
4102 let stdout_str = String::from_utf8(stdout_buf)
4103 .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4104 let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4105 let value: serde_json::Value =
4106 super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4107 AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4108 })?;
4109 Ok((value, cost, false))
4110 }
4111 None => {
4112 let _ = child.kill();
4113 let _ = child.wait();
4114 Err(AppError::Validation(format!(
4115 "opencode timed out after {timeout_secs} seconds"
4116 )))
4117 }
4118 }
4119}
4120
4121#[cfg(test)]
4126mod tests {
4127 use super::*;
4128 use rusqlite::Connection;
4129 #[cfg(unix)]
4130 use std::os::unix::fs::PermissionsExt;
4131
4132 fn open_test_db() -> Connection {
4134 let conn = Connection::open_in_memory().expect("in-memory db");
4135 conn.execute_batch(
4136 "CREATE TABLE memories (
4137 id INTEGER PRIMARY KEY AUTOINCREMENT,
4138 namespace TEXT NOT NULL DEFAULT 'global',
4139 name TEXT NOT NULL,
4140 type TEXT NOT NULL DEFAULT 'note',
4141 description TEXT NOT NULL DEFAULT '',
4142 body TEXT NOT NULL DEFAULT '',
4143 body_hash TEXT NOT NULL DEFAULT '',
4144 session_id TEXT,
4145 source TEXT NOT NULL DEFAULT 'agent',
4146 metadata TEXT NOT NULL DEFAULT '{}',
4147 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4148 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4149 deleted_at INTEGER,
4150 UNIQUE(namespace, name)
4151 );
4152 CREATE TABLE entities (
4153 id INTEGER PRIMARY KEY AUTOINCREMENT,
4154 namespace TEXT NOT NULL DEFAULT 'global',
4155 name TEXT NOT NULL,
4156 type TEXT NOT NULL DEFAULT 'concept',
4157 description TEXT,
4158 degree INTEGER NOT NULL DEFAULT 0,
4159 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4160 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4161 UNIQUE(namespace, name)
4162 );
4163 CREATE TABLE memory_entities (
4164 memory_id INTEGER NOT NULL,
4165 entity_id INTEGER NOT NULL,
4166 PRIMARY KEY (memory_id, entity_id)
4167 );
4168 CREATE TABLE relationships (
4169 id INTEGER PRIMARY KEY AUTOINCREMENT,
4170 namespace TEXT NOT NULL DEFAULT 'global',
4171 source_id INTEGER NOT NULL,
4172 target_id INTEGER NOT NULL,
4173 relation TEXT NOT NULL,
4174 weight REAL NOT NULL DEFAULT 0.5,
4175 description TEXT,
4176 UNIQUE(source_id, target_id, relation)
4177 );
4178 CREATE TABLE memory_embeddings (
4179 memory_id INTEGER PRIMARY KEY,
4180 namespace TEXT NOT NULL,
4181 embedding BLOB NOT NULL,
4182 source TEXT NOT NULL,
4183 model TEXT NOT NULL DEFAULT '',
4184 dim INTEGER NOT NULL DEFAULT 384,
4185 created_at INTEGER NOT NULL DEFAULT (unixepoch())
4186 );",
4187 )
4188 .expect("schema creation must succeed");
4189 conn
4190 }
4191
4192 #[test]
4193 fn scan_unbound_memories_finds_memories_without_bindings() {
4194 let conn = open_test_db();
4195 conn.execute(
4196 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4197 [],
4198 )
4199 .unwrap();
4200
4201 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4202 assert_eq!(results.len(), 1);
4203 assert_eq!(results[0].1, "test-mem");
4204 }
4205
4206 #[test]
4207 fn scan_unbound_memories_excludes_bound_memories() {
4208 let conn = open_test_db();
4209 conn.execute(
4210 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4211 [],
4212 )
4213 .unwrap();
4214 let mem_id: i64 = conn
4215 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4216 r.get(0)
4217 })
4218 .unwrap();
4219 conn.execute(
4220 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4221 [],
4222 )
4223 .unwrap();
4224 let ent_id: i64 = conn
4225 .query_row(
4226 "SELECT id FROM entities WHERE name='some-entity'",
4227 [],
4228 |r| r.get(0),
4229 )
4230 .unwrap();
4231 conn.execute(
4232 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4233 rusqlite::params![mem_id, ent_id],
4234 )
4235 .unwrap();
4236
4237 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4238 assert!(results.is_empty(), "bound memory must not appear in scan");
4239 }
4240
4241 #[test]
4242 fn scan_entities_without_description_finds_null_description() {
4243 let conn = open_test_db();
4244 conn.execute(
4245 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4246 [],
4247 )
4248 .unwrap();
4249
4250 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4251 assert_eq!(results.len(), 1);
4252 assert_eq!(results[0].1, "my-tool");
4253 }
4254
4255 #[test]
4256 fn scan_entities_without_description_excludes_entities_with_description() {
4257 let conn = open_test_db();
4258 conn.execute(
4259 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4260 [],
4261 )
4262 .unwrap();
4263
4264 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4265 assert!(
4266 results.is_empty(),
4267 "entity with description must not appear"
4268 );
4269 }
4270
4271 #[test]
4272 fn scan_short_body_memories_finds_short_bodies() {
4273 let conn = open_test_db();
4274 conn.execute(
4275 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4276 [],
4277 )
4278 .unwrap();
4279
4280 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4281 assert_eq!(results.len(), 1);
4282 assert_eq!(results[0].1, "short-mem");
4283 }
4284
4285 #[test]
4286 fn scan_short_body_memories_excludes_long_bodies() {
4287 let conn = open_test_db();
4288 let long_body = "a".repeat(1000);
4289 conn.execute(
4290 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4291 rusqlite::params![long_body],
4292 )
4293 .unwrap();
4294
4295 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4296 assert!(results.is_empty(), "long memory must not appear in scan");
4297 }
4298
4299 #[test]
4300 fn scan_respects_limit() {
4301 let conn = open_test_db();
4302 for i in 0..5 {
4303 conn.execute(
4304 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4305 [],
4306 )
4307 .unwrap();
4308 }
4309
4310 let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4311 assert_eq!(results.len(), 3, "limit must be respected");
4312 }
4313
4314 #[test]
4315 fn scan_memories_without_embeddings_finds_only_missing_rows() {
4316 let conn = open_test_db();
4317 conn.execute(
4318 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4319 [],
4320 )
4321 .unwrap();
4322 conn.execute(
4323 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4324 [],
4325 )
4326 .unwrap();
4327 let memory_id: i64 = conn
4328 .query_row(
4329 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4330 [],
4331 |r| r.get(0),
4332 )
4333 .unwrap();
4334 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4335 memories::upsert_vec(
4336 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4337 )
4338 .unwrap();
4339
4340 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4341 assert_eq!(results.len(), 1);
4342 assert_eq!(results[0].1, "missing-vec");
4343 }
4344
4345 #[test]
4346 fn scan_memories_without_embeddings_respects_name_filter() {
4347 let conn = open_test_db();
4348 conn.execute(
4349 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4350 [],
4351 )
4352 .unwrap();
4353 conn.execute(
4354 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4355 [],
4356 )
4357 .unwrap();
4358
4359 let results =
4360 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4361 .unwrap();
4362 assert_eq!(results.len(), 1);
4363 assert_eq!(results[0].1, "match-me");
4364 }
4365
4366 #[test]
4367 fn queue_db_schema_creates_correctly() {
4368 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4369 let conn = open_queue_db(&tmp_path).expect("queue db must open");
4370 let count: i64 = conn
4371 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4372 .unwrap();
4373 assert_eq!(count, 0);
4374 let _ = std::fs::remove_file(&tmp_path);
4375 }
4376
4377 #[test]
4378 fn parse_claude_output_valid_bindings() {
4379 let output = r#"[
4380 {"type":"system","subtype":"init"},
4381 {"type":"result","is_error":false,"total_cost_usd":0.01,
4382 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4383 ]"#;
4384 let result = crate::commands::claude_runner::parse_claude_output(output)
4385 .expect("must parse successfully");
4386 assert!(result.value.get("entities").is_some());
4387 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4388 assert!(!result.is_oauth);
4389 }
4390
4391 #[test]
4392 fn parse_claude_output_detects_oauth() {
4393 let output = r#"[
4394 {"type":"system","subtype":"init","apiKeySource":"none"},
4395 {"type":"result","is_error":false,"total_cost_usd":0.0,
4396 "structured_output":{"entities":[],"relationships":[]}}
4397 ]"#;
4398 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4399 assert!(result.is_oauth);
4400 }
4401
4402 #[test]
4403 fn parse_claude_output_rate_limit_returns_error() {
4404 let output = r#"[
4405 {"type":"system","subtype":"init"},
4406 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4407 ]"#;
4408 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4409 assert!(matches!(err, AppError::RateLimited { .. }));
4410 }
4411
4412 #[test]
4413 fn parse_claude_output_auth_error() {
4414 let output = r#"[
4415 {"type":"system","subtype":"init"},
4416 {"type":"result","is_error":true,"error":"authentication failed"}
4417 ]"#;
4418 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4419 assert!(format!("{err}").contains("authentication failed"));
4420 }
4421
4422 #[cfg(unix)]
4423 #[test]
4424 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4425 let tmp = tempfile::tempdir().expect("tempdir");
4426 let binary = tmp.path().join("codex-mock");
4427 std::fs::write(
4428 &binary,
4429 r#"#!/usr/bin/env bash
4430set -euo pipefail
4431cat <<'JSONL'
4432{"type":"thread.started","thread_id":"mock-thread-0"}
4433{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4434{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4435JSONL
4436"#,
4437 )
4438 .expect("mock codex write");
4439 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4440 perms.set_mode(0o755);
4441 std::fs::set_permissions(&binary, perms).expect("chmod");
4442
4443 let (value, cost, is_oauth) =
4444 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4445 .expect("call_codex must accept body-enrich payload");
4446
4447 assert_eq!(value["enriched_body"], "expanded body");
4448 assert_eq!(cost, 0.0);
4449 assert!(!is_oauth);
4450 }
4451
4452 #[test]
4453 fn dry_run_emits_preview_without_calling_llm() {
4454 let conn = open_test_db();
4459 conn.execute(
4460 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4461 [],
4462 )
4463 .unwrap();
4464
4465 let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4466 assert_eq!(results.len(), 1);
4467 assert_eq!(results[0].1, "dry-mem");
4468 }
4471
4472 #[test]
4473 fn persist_entity_description_updates_db() {
4474 let conn = open_test_db();
4475 conn.execute(
4476 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4477 [],
4478 )
4479 .unwrap();
4480 let eid: i64 = conn
4481 .query_row(
4482 "SELECT id FROM entities WHERE name='tokio-runtime'",
4483 [],
4484 |r| r.get(0),
4485 )
4486 .unwrap();
4487
4488 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4489
4490 let desc: String = conn
4491 .query_row(
4492 "SELECT description FROM entities WHERE id=?1",
4493 rusqlite::params![eid],
4494 |r| r.get(0),
4495 )
4496 .unwrap();
4497 assert_eq!(desc, "Async runtime for Rust applications");
4498 }
4499
4500 #[test]
4501 fn bindings_schema_is_valid_json() {
4502 let _: serde_json::Value =
4503 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4504 }
4505
4506 #[test]
4507 fn entity_description_schema_is_valid_json() {
4508 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4509 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4510 }
4511
4512 #[test]
4513 fn body_enrich_schema_is_valid_json() {
4514 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4515 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4516 }
4517}