1use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51const BINDINGS_SCHEMA: &str = r#"{
56 "type": "object",
57 "properties": {
58 "entities": {
59 "type": "array",
60 "items": {
61 "type": "object",
62 "properties": {
63 "name": { "type": "string" },
64 "entity_type": {
65 "type": "string",
66 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67 }
68 },
69 "required": ["name", "entity_type"],
70 "additionalProperties": false
71 }
72 },
73 "relationships": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "source": { "type": "string" },
79 "target": { "type": "string" },
80 "relation": {
81 "type": "string",
82 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83 },
84 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85 },
86 "required": ["source","target","relation","strength"],
87 "additionalProperties": false
88 }
89 }
90 },
91 "required": ["entities","relationships"],
92 "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96 "type": "object",
97 "properties": {
98 "description": { "type": "string" }
99 },
100 "required": ["description"],
101 "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105 "type": "object",
106 "properties": {
107 "enriched_body": { "type": "string" }
108 },
109 "required": ["enriched_body"],
110 "additionalProperties": false
111}"#;
112
113const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126 "reasoning": { "type": "string" }
127 },
128 "required": ["calibrated_weight", "reasoning"],
129 "additionalProperties": false
130}"#;
131
132const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149 "type": "object",
150 "properties": {
151 "relation": { "type": "string" },
152 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153 "reasoning": { "type": "string" }
154 },
155 "required": ["relation", "strength", "reasoning"],
156 "additionalProperties": false
157}"#;
158
159const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166 "type": "object",
167 "properties": {
168 "relation": { "type": "string" },
169 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170 "reasoning": { "type": "string" }
171 },
172 "required": ["relation", "strength", "reasoning"],
173 "additionalProperties": false
174}"#;
175
176const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183 "type": "object",
184 "properties": {
185 "validated_type": { "type": "string" },
186 "was_correct": { "type": "boolean" },
187 "reasoning": { "type": "string" }
188 },
189 "required": ["validated_type", "was_correct", "reasoning"],
190 "additionalProperties": false
191}"#;
192
193const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200 "type": "object",
201 "properties": {
202 "description": { "type": "string" },
203 "reasoning": { "type": "string" }
204 },
205 "required": ["description", "reasoning"],
206 "additionalProperties": false
207}"#;
208
209const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214 "type": "object",
215 "properties": {
216 "domain": { "type": "string" },
217 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218 "reasoning": { "type": "string" }
219 },
220 "required": ["domain", "confidence", "reasoning"],
221 "additionalProperties": false
222}"#;
223
224const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230 "type": "object",
231 "properties": {
232 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234 "reasoning": { "type": "string" }
235 },
236 "required": ["quality_score", "issues", "reasoning"],
237 "additionalProperties": false
238}"#;
239
240const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247 "type": "object",
248 "properties": {
249 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251 "summary": { "type": "string" }
252 },
253 "required": ["entities", "relationships", "summary"],
254 "additionalProperties": false
255}"#;
256
257const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263 "type": "object",
264 "properties": {
265 "restructured_body": { "type": "string" },
266 "changes_summary": { "type": "string" }
267 },
268 "required": ["restructured_body", "changes_summary"],
269 "additionalProperties": false
270}"#;
271
272const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299 MemoryBindings,
301 EntityDescriptions,
303 BodyEnrich,
305 ReEmbed,
307 WeightCalibrate,
309 RelationReclassify,
311 EntityConnect,
313 EntityTypeValidate,
315 DescriptionEnrich,
317 CrossDomainBridges,
319 DomainClassify,
321 GraphAudit,
323 DeepResearchSynth,
325 BodyExtract,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332 ClaudeCode,
334 Codex,
336 #[value(name = "opencode")]
338 Opencode,
339}
340
341impl std::fmt::Display for EnrichMode {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 match self {
344 EnrichMode::ClaudeCode => write!(f, "claude-code"),
345 EnrichMode::Codex => write!(f, "codex"),
346 EnrichMode::Opencode => write!(f, "opencode"),
347 }
348 }
349}
350
351#[derive(clap::Args)]
353#[command(
354 about = "Enrich graph memories and entities using an LLM provider",
355 after_long_help = "EXAMPLES:\n \
356 # Add missing entity bindings to all unbound memories\n \
357 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
358 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
359 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
360 # Expand short memory bodies (GAP-18)\n \
361 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
362 # Rebuild only missing memory embeddings without rewriting bodies\n \
363 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
364 # Resume an interrupted body-enrich run\n \
365 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
366 # Retry only failed items from a previous run\n \
367 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
368 EXIT CODES:\n \
369 0 success\n \
370 1 validation error (bad args, binary not found)\n \
371 14 I/O error"
372)]
373pub struct EnrichArgs {
374 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
376 pub operation: EnrichOperation,
377
378 #[arg(long, value_enum, default_value = "claude-code")]
380 pub mode: EnrichMode,
381
382 #[arg(long, value_name = "N")]
384 pub limit: Option<usize>,
385
386 #[arg(long)]
388 pub dry_run: bool,
389
390 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
392 pub namespace: Option<String>,
393
394 #[arg(long, value_name = "PATH")]
397 pub claude_binary: Option<PathBuf>,
398
399 #[arg(long, value_name = "MODEL")]
401 pub claude_model: Option<String>,
402
403 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
405 pub claude_timeout: u64,
406
407 #[arg(long, value_name = "PATH")]
410 pub codex_binary: Option<PathBuf>,
411
412 #[arg(long, value_name = "MODEL")]
414 pub codex_model: Option<String>,
415
416 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
418 pub codex_timeout: u64,
419
420 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
423 pub opencode_binary: Option<PathBuf>,
424
425 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
427 pub opencode_model: Option<String>,
428
429 #[arg(
431 long,
432 value_name = "SECONDS",
433 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
434 default_value_t = 300
435 )]
436 pub opencode_timeout: u64,
437
438 #[arg(long, value_name = "USD")]
441 pub max_cost_usd: Option<f64>,
442
443 #[arg(long)]
446 pub resume: bool,
447
448 #[arg(long)]
450 pub retry_failed: bool,
451
452 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
455 pub min_output_chars: usize,
456
457 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
459 pub max_output_chars: usize,
460
461 #[arg(long, default_value_t = true)]
463 pub preserve_check: bool,
464
465 #[arg(long, value_name = "PATH")]
467 pub prompt_template: Option<PathBuf>,
468
469 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
473 pub llm_parallelism: u32,
474
475 #[arg(long)]
478 pub json: bool,
479
480 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
482 pub db: Option<String>,
483
484 #[arg(long, value_name = "SECONDS")]
487 pub wait_job_singleton: Option<u64>,
488
489 #[arg(long, default_value_t = false)]
493 pub force_job_singleton: bool,
494
495 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
499 pub names: Vec<String>,
500
501 #[arg(long, value_name = "PATH")]
505 pub names_file: Option<PathBuf>,
506
507 #[arg(long, default_value_t = false)]
511 pub preflight_check: bool,
512
513 #[arg(long, value_enum)]
517 pub fallback_mode: Option<EnrichMode>,
518
519 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
522 pub rate_limit_buffer: u64,
523
524 #[arg(long, default_value_t = true)]
528 pub max_load_check: bool,
529
530 #[arg(long, value_name = "N", default_value_t = 5)]
533 pub circuit_breaker_threshold: u32,
534
535 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
542 pub preserve_threshold: f64,
543
544 #[arg(long, default_value_t = true)]
549 pub codex_model_validate: bool,
550
551 #[arg(long, value_name = "MODEL")]
556 pub codex_model_fallback: Option<String>,
557}
558
559#[derive(Debug, Serialize)]
568struct PhaseEvent<'a> {
569 phase: &'a str,
570 #[serde(skip_serializing_if = "Option::is_none")]
571 binary_path: Option<&'a str>,
572 #[serde(skip_serializing_if = "Option::is_none")]
573 version: Option<&'a str>,
574 #[serde(skip_serializing_if = "Option::is_none")]
575 items_total: Option<usize>,
576 #[serde(skip_serializing_if = "Option::is_none")]
577 items_pending: Option<usize>,
578 #[serde(skip_serializing_if = "Option::is_none")]
580 llm_parallelism: Option<u32>,
581}
582
583#[derive(Debug, Serialize)]
584struct ItemEvent<'a> {
585 item: &'a str,
587 status: &'a str,
588 #[serde(skip_serializing_if = "Option::is_none")]
589 memory_id: Option<i64>,
590 #[serde(skip_serializing_if = "Option::is_none")]
591 entity_id: Option<i64>,
592 #[serde(skip_serializing_if = "Option::is_none")]
593 entities: Option<usize>,
594 #[serde(skip_serializing_if = "Option::is_none")]
595 rels: Option<usize>,
596 #[serde(skip_serializing_if = "Option::is_none")]
597 chars_before: Option<usize>,
598 #[serde(skip_serializing_if = "Option::is_none")]
599 chars_after: Option<usize>,
600 #[serde(skip_serializing_if = "Option::is_none")]
601 cost_usd: Option<f64>,
602 #[serde(skip_serializing_if = "Option::is_none")]
603 elapsed_ms: Option<u64>,
604 #[serde(skip_serializing_if = "Option::is_none")]
605 error: Option<String>,
606 index: usize,
607 total: usize,
608}
609
610#[derive(Debug, Serialize)]
611struct EnrichSummary {
612 summary: bool,
613 operation: String,
614 items_total: usize,
615 completed: usize,
616 failed: usize,
617 skipped: usize,
618 cost_usd: f64,
619 elapsed_ms: u64,
620 #[serde(skip_serializing_if = "Option::is_none")]
625 backend_invoked: Option<&'static str>,
626}
627
628use crate::output::emit_json_line as emit_json;
629
630fn open_queue_db(path: &str) -> Result<Connection, AppError> {
645 let conn = Connection::open(path)?;
646 conn.pragma_update(None, "journal_mode", "wal")?;
647 conn.execute_batch(
648 "CREATE TABLE IF NOT EXISTS queue (
649 id INTEGER PRIMARY KEY AUTOINCREMENT,
650 item_key TEXT NOT NULL UNIQUE,
651 item_type TEXT NOT NULL DEFAULT 'memory',
652 status TEXT NOT NULL DEFAULT 'pending',
653 memory_id INTEGER,
654 entity_id INTEGER,
655 entities INTEGER DEFAULT 0,
656 rels INTEGER DEFAULT 0,
657 error TEXT,
658 cost_usd REAL DEFAULT 0.0,
659 attempt INTEGER DEFAULT 0,
660 elapsed_ms INTEGER,
661 created_at TEXT DEFAULT (datetime('now')),
662 done_at TEXT
663 );
664 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
665 )?;
666 Ok(conn)
667}
668
669fn call_claude(
677 binary: &Path,
678 prompt: &str,
679 json_schema: &str,
680 input_text: &str,
681 model: Option<&str>,
682 timeout_secs: u64,
683) -> Result<(serde_json::Value, f64, bool), AppError> {
684 let result = crate::commands::claude_runner::run_claude(
685 binary,
686 prompt,
687 json_schema,
688 input_text,
689 model,
690 timeout_secs,
691 7,
692 )?;
693 Ok((result.value, result.cost_usd, result.is_oauth))
694}
695
696enum PreflightOutcome {
702 Healthy,
704 RateLimited {
708 reason: String,
709 suggestion: &'static str,
710 },
711 Error(AppError),
713}
714
715fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
723 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
724
725 match args.mode {
726 EnrichMode::ClaudeCode => {
727 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
728 Ok(b) => b,
729 Err(e) => return PreflightOutcome::Error(e),
730 };
731 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
736 Ok(p) => p,
737 Err(e) => {
738 return PreflightOutcome::Error(AppError::Io(e));
739 }
740 };
741 let mut cmd = std::process::Command::new(&bin);
742 cmd.env_clear();
743 for var in &["PATH", "HOME", "USER"] {
744 if let Ok(val) = std::env::var(var) {
745 cmd.env(var, val);
746 }
747 }
748 cmd.arg("-p")
749 .arg("ping")
750 .arg("--max-turns")
751 .arg("1")
752 .arg("--strict-mcp-config")
753 .arg("--mcp-config")
754 .arg(mcp_config_path.as_os_str())
755 .arg("--dangerously-skip-permissions")
756 .arg("--settings")
757 .arg("{\"hooks\":{}}")
758 .arg("--output-format")
759 .arg("json")
760 .stdin(std::process::Stdio::null())
761 .stdout(std::process::Stdio::piped())
762 .stderr(std::process::Stdio::piped());
763
764 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
765 Ok(c) => c,
766 Err(e) => {
767 return PreflightOutcome::Error(AppError::Io(e));
768 }
769 };
770 let output = match wait_with_timeout(child, timeout) {
771 Ok(out) => out,
772 Err(e) => return PreflightOutcome::Error(e),
773 };
774 if !output.status.success() {
775 let stderr = String::from_utf8_lossy(&output.stderr);
776 if stderr.contains("hit your session limit")
777 || stderr.contains("rate_limit")
778 || stderr.contains("429")
779 {
780 return PreflightOutcome::RateLimited {
781 reason: stderr.trim().to_string(),
782 suggestion:
783 "wait for the OAuth window to reset or use --fallback-mode codex",
784 };
785 }
786 return PreflightOutcome::Error(AppError::Validation(format!(
787 "preflight probe failed: {stderr}",
788 stderr = stderr.trim()
789 )));
790 }
791 PreflightOutcome::Healthy
792 }
793 EnrichMode::Codex => {
794 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
795 Ok(b) => b,
796 Err(e) => return PreflightOutcome::Error(e),
797 };
798 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
799 .map_err(PreflightOutcome::Error)
800 .ok();
801 let schema = "{}";
802 let schema_path = match super::codex_spawn::trusted_schema_path() {
803 Ok(p) => p,
804 Err(e) => return PreflightOutcome::Error(e),
805 };
806 let spawn_args = super::codex_spawn::CodexSpawnArgs {
807 binary: &bin,
808 prompt: "ping",
809 json_schema: schema,
810 input_text: "",
811 model: args.codex_model.as_deref(),
812 timeout_secs: args.rate_limit_buffer.max(60),
813 schema_path: schema_path.clone(),
814 };
815 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
816 Ok(c) => c,
817 Err(e) => return PreflightOutcome::Error(e),
818 };
819 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
820 Ok(c) => c,
821 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
822 };
823 let output = match wait_with_timeout(child, timeout) {
824 Ok(out) => out,
825 Err(e) => return PreflightOutcome::Error(e),
826 };
827 let _ = std::fs::remove_file(&schema_path);
828 if !output.status.success() {
829 let stderr = String::from_utf8_lossy(&output.stderr);
830 if stderr.contains("rate_limit")
831 || stderr.contains("429")
832 || stderr.contains("Too Many Requests")
833 {
834 return PreflightOutcome::RateLimited {
835 reason: stderr.trim().to_string(),
836 suggestion: "wait for the rate-limit window to reset",
837 };
838 }
839 return PreflightOutcome::Error(AppError::Validation(format!(
840 "preflight probe failed: {stderr}",
841 stderr = stderr.trim()
842 )));
843 }
844 PreflightOutcome::Healthy
845 }
846 EnrichMode::Opencode => {
847 let bin = match super::opencode_runner::find_opencode_binary_with_override(
848 args.opencode_binary.as_deref(),
849 ) {
850 Ok(b) => b,
851 Err(e) => return PreflightOutcome::Error(e),
852 };
853 let model =
854 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
855 let mut cmd =
856 super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "");
857 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
858 Ok(c) => c,
859 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
860 };
861 let output = match wait_with_timeout(child, timeout) {
862 Ok(out) => out,
863 Err(e) => return PreflightOutcome::Error(e),
864 };
865 if !output.status.success() {
866 let stderr = String::from_utf8_lossy(&output.stderr);
867 if stderr.contains("rate_limit")
868 || stderr.contains("429")
869 || stderr.contains("Too Many Requests")
870 {
871 return PreflightOutcome::RateLimited {
872 reason: stderr.trim().to_string(),
873 suggestion: "wait for the rate-limit window to reset",
874 };
875 }
876 return PreflightOutcome::Error(AppError::Validation(format!(
877 "preflight probe failed: {stderr}",
878 stderr = stderr.trim()
879 )));
880 }
881 PreflightOutcome::Healthy
882 }
883 }
884}
885
886fn wait_with_timeout(
888 mut child: std::process::Child,
889 timeout: std::time::Duration,
890) -> Result<std::process::Output, AppError> {
891 use wait_timeout::ChildExt;
892 let start = std::time::Instant::now();
893 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
894 if status.is_none() {
895 let _ = child.kill();
896 let _ = child.wait();
897 return Err(AppError::Validation(format!(
898 "preflight probe timed out after {}s",
899 start.elapsed().as_secs()
900 )));
901 }
902 let mut stdout = Vec::new();
903 if let Some(mut out) = child.stdout.take() {
904 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
905 }
906 let mut stderr = Vec::new();
907 if let Some(mut err) = child.stderr.take() {
908 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
909 }
910 let exit = status.unwrap();
911 Ok(std::process::Output {
912 status: exit,
913 stdout,
914 stderr,
915 })
916}
917
918fn scan_unbound_memories(
929 conn: &Connection,
930 namespace: &str,
931 limit: Option<usize>,
932 name_filter: &[String],
933) -> Result<Vec<(i64, String, String)>, AppError> {
934 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
935
936 if name_filter.is_empty() {
937 let sql = format!(
938 "SELECT m.id, m.name, m.body
939 FROM memories m
940 WHERE m.namespace = ?1
941 AND m.deleted_at IS NULL
942 AND NOT EXISTS (
943 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
944 )
945 ORDER BY m.id
946 {limit_clause}"
947 );
948 let mut stmt = conn.prepare(&sql)?;
949 let rows = stmt
950 .query_map(rusqlite::params![namespace], |r| {
951 Ok((
952 r.get::<_, i64>(0)?,
953 r.get::<_, String>(1)?,
954 r.get::<_, String>(2)?,
955 ))
956 })?
957 .collect::<Result<Vec<_>, _>>()?;
958 Ok(rows)
959 } else {
960 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
962 .map(|i| format!("?{i}"))
963 .collect();
964 let in_clause = placeholders.join(", ");
965 let sql = format!(
966 "SELECT m.id, m.name, m.body
967 FROM memories m
968 WHERE m.namespace = ?1
969 AND m.deleted_at IS NULL
970 AND m.name IN ({in_clause})
971 AND NOT EXISTS (
972 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
973 )
974 ORDER BY m.id
975 {limit_clause}"
976 );
977 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
978 params_vec.push(&namespace);
979 for n in name_filter {
980 params_vec.push(n);
981 }
982 let mut stmt = conn.prepare(&sql)?;
983 let rows = stmt
984 .query_map(
985 rusqlite::params_from_iter(params_vec.iter().copied()),
986 |r| {
987 Ok((
988 r.get::<_, i64>(0)?,
989 r.get::<_, String>(1)?,
990 r.get::<_, String>(2)?,
991 ))
992 },
993 )?
994 .collect::<Result<Vec<_>, _>>()?;
995 Ok(rows)
996 }
997}
998
999fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1004 let content = std::fs::read_to_string(path).map_err(|e| {
1005 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1006 })?;
1007 let mut seen = std::collections::HashSet::new();
1008 let mut out = Vec::new();
1009 for line in content.lines() {
1010 let trimmed = line.trim();
1011 if trimmed.is_empty() || trimmed.starts_with('#') {
1012 continue;
1013 }
1014 if seen.insert(trimmed.to_string()) {
1015 out.push(trimmed.to_string());
1016 }
1017 }
1018 Ok(out)
1019}
1020
1021fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1023 let mut combined: Vec<String> = args.names.clone();
1024 if let Some(p) = &args.names_file {
1025 let from_file = read_names_file(p)?;
1026 for n in from_file {
1027 if !combined.contains(&n) {
1028 combined.push(n);
1029 }
1030 }
1031 }
1032 Ok(combined)
1033}
1034
1035fn scan_entities_without_description(
1039 conn: &Connection,
1040 namespace: &str,
1041 limit: Option<usize>,
1042 name_filter: &[String],
1043) -> Result<Vec<(i64, String, String)>, AppError> {
1044 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1045
1046 if name_filter.is_empty() {
1047 let sql = format!(
1048 "SELECT id, name, type
1049 FROM entities
1050 WHERE namespace = ?1
1051 AND (description IS NULL OR description = '')
1052 ORDER BY id
1053 {limit_clause}"
1054 );
1055 let mut stmt = conn.prepare(&sql)?;
1056 let rows = stmt
1057 .query_map(rusqlite::params![namespace], |r| {
1058 Ok((
1059 r.get::<_, i64>(0)?,
1060 r.get::<_, String>(1)?,
1061 r.get::<_, String>(2)?,
1062 ))
1063 })?
1064 .collect::<Result<Vec<_>, _>>()?;
1065 Ok(rows)
1066 } else {
1067 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1068 .map(|i| format!("?{i}"))
1069 .collect();
1070 let in_clause = placeholders.join(", ");
1071 let sql = format!(
1072 "SELECT id, name, type
1073 FROM entities
1074 WHERE namespace = ?1
1075 AND name IN ({in_clause})
1076 AND (description IS NULL OR description = '')
1077 ORDER BY id
1078 {limit_clause}"
1079 );
1080 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1081 params_vec.push(&namespace);
1082 for n in name_filter {
1083 params_vec.push(n);
1084 }
1085 let mut stmt = conn.prepare(&sql)?;
1086 let rows = stmt
1087 .query_map(
1088 rusqlite::params_from_iter(params_vec.iter().copied()),
1089 |r| {
1090 Ok((
1091 r.get::<_, i64>(0)?,
1092 r.get::<_, String>(1)?,
1093 r.get::<_, String>(2)?,
1094 ))
1095 },
1096 )?
1097 .collect::<Result<Vec<_>, _>>()?;
1098 Ok(rows)
1099 }
1100}
1101
1102fn scan_short_body_memories(
1106 conn: &Connection,
1107 namespace: &str,
1108 min_chars: usize,
1109 limit: Option<usize>,
1110 name_filter: &[String],
1111) -> Result<Vec<(i64, String, String)>, AppError> {
1112 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1113
1114 if name_filter.is_empty() {
1115 let sql = format!(
1116 "SELECT m.id, m.name, m.body
1117 FROM memories m
1118 WHERE m.namespace = ?1
1119 AND m.deleted_at IS NULL
1120 AND LENGTH(COALESCE(m.body,'')) < ?2
1121 ORDER BY m.id
1122 {limit_clause}"
1123 );
1124 let mut stmt = conn.prepare(&sql)?;
1125 let rows = stmt
1126 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1127 Ok((
1128 r.get::<_, i64>(0)?,
1129 r.get::<_, String>(1)?,
1130 r.get::<_, String>(2)?,
1131 ))
1132 })?
1133 .collect::<Result<Vec<_>, _>>()?;
1134 Ok(rows)
1135 } else {
1136 let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1137 .map(|i| format!("?{i}"))
1138 .collect();
1139 let in_clause = placeholders.join(", ");
1140 let sql = format!(
1141 "SELECT m.id, m.name, m.body
1142 FROM memories m
1143 WHERE m.namespace = ?1
1144 AND m.deleted_at IS NULL
1145 AND m.name IN ({in_clause})
1146 AND LENGTH(COALESCE(m.body,'')) < ?2
1147 ORDER BY m.id
1148 {limit_clause}"
1149 );
1150 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1151 let min_chars_i64 = min_chars as i64;
1152 params_vec.push(&namespace);
1153 params_vec.push(&min_chars_i64);
1154 for n in name_filter {
1155 params_vec.push(n);
1156 }
1157 let mut stmt = conn.prepare(&sql)?;
1158 let rows = stmt
1159 .query_map(
1160 rusqlite::params_from_iter(params_vec.iter().copied()),
1161 |r| {
1162 Ok((
1163 r.get::<_, i64>(0)?,
1164 r.get::<_, String>(1)?,
1165 r.get::<_, String>(2)?,
1166 ))
1167 },
1168 )?
1169 .collect::<Result<Vec<_>, _>>()?;
1170 Ok(rows)
1171 }
1172}
1173
1174fn scan_memories_without_embeddings(
1178 conn: &Connection,
1179 namespace: &str,
1180 limit: Option<usize>,
1181 name_filter: &[String],
1182) -> Result<Vec<(i64, String, String)>, AppError> {
1183 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1184
1185 if name_filter.is_empty() {
1186 let sql = format!(
1187 "SELECT m.id, m.name, COALESCE(m.body,'')
1188 FROM memories m
1189 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1190 WHERE m.namespace = ?1
1191 AND m.deleted_at IS NULL
1192 AND me.memory_id IS NULL
1193 ORDER BY m.id
1194 {limit_clause}"
1195 );
1196 let mut stmt = conn.prepare(&sql)?;
1197 let rows = stmt
1198 .query_map(rusqlite::params![namespace], |r| {
1199 Ok((
1200 r.get::<_, i64>(0)?,
1201 r.get::<_, String>(1)?,
1202 r.get::<_, String>(2)?,
1203 ))
1204 })?
1205 .collect::<Result<Vec<_>, _>>()?;
1206 Ok(rows)
1207 } else {
1208 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1209 .map(|i| format!("?{i}"))
1210 .collect();
1211 let in_clause = placeholders.join(", ");
1212 let sql = format!(
1213 "SELECT m.id, m.name, COALESCE(m.body,'')
1214 FROM memories m
1215 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1216 WHERE m.namespace = ?1
1217 AND m.deleted_at IS NULL
1218 AND m.name IN ({in_clause})
1219 AND me.memory_id IS NULL
1220 ORDER BY m.id
1221 {limit_clause}"
1222 );
1223 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1224 params_vec.push(&namespace);
1225 for n in name_filter {
1226 params_vec.push(n);
1227 }
1228 let mut stmt = conn.prepare(&sql)?;
1229 let rows = stmt
1230 .query_map(
1231 rusqlite::params_from_iter(params_vec.iter().copied()),
1232 |r| {
1233 Ok((
1234 r.get::<_, i64>(0)?,
1235 r.get::<_, String>(1)?,
1236 r.get::<_, String>(2)?,
1237 ))
1238 },
1239 )?
1240 .collect::<Result<Vec<_>, _>>()?;
1241 Ok(rows)
1242 }
1243}
1244
1245#[allow(clippy::type_complexity)]
1247fn scan_weight_candidates(
1248 conn: &Connection,
1249 namespace: &str,
1250 limit: Option<usize>,
1251) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1252 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1253 let sql = format!(
1254 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1255 FROM relationships r \
1256 JOIN entities e1 ON e1.id = r.source_id \
1257 JOIN entities e2 ON e2.id = r.target_id \
1258 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1259 ORDER BY r.weight DESC {limit_clause}"
1260 );
1261 let mut stmt = conn.prepare(&sql)?;
1262 let rows = stmt
1263 .query_map(rusqlite::params![namespace], |r| {
1264 Ok((
1265 r.get::<_, i64>(0)?,
1266 r.get::<_, String>(1)?,
1267 r.get::<_, String>(2)?,
1268 r.get::<_, String>(3)?,
1269 r.get::<_, f64>(4)?,
1270 ))
1271 })?
1272 .collect::<Result<Vec<_>, _>>()?;
1273 Ok(rows)
1274}
1275
1276fn scan_generic_relations(
1278 conn: &Connection,
1279 namespace: &str,
1280 limit: Option<usize>,
1281) -> Result<Vec<(i64, String, String, String)>, AppError> {
1282 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1283 let sql = format!(
1284 "SELECT r.id, e1.name, e2.name, r.relation \
1285 FROM relationships r \
1286 JOIN entities e1 ON e1.id = r.source_id \
1287 JOIN entities e2 ON e2.id = r.target_id \
1288 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1289 ORDER BY r.id {limit_clause}"
1290 );
1291 let mut stmt = conn.prepare(&sql)?;
1292 let rows = stmt
1293 .query_map(rusqlite::params![namespace], |r| {
1294 Ok((
1295 r.get::<_, i64>(0)?,
1296 r.get::<_, String>(1)?,
1297 r.get::<_, String>(2)?,
1298 r.get::<_, String>(3)?,
1299 ))
1300 })?
1301 .collect::<Result<Vec<_>, _>>()?;
1302 Ok(rows)
1303}
1304
1305fn persist_memory_bindings(
1314 conn: &Connection,
1315 namespace: &str,
1316 memory_id: i64,
1317 entities_json: &serde_json::Value,
1318 rels_json: &serde_json::Value,
1319) -> Result<(usize, usize), AppError> {
1320 #[derive(Deserialize)]
1321 struct EntityItem {
1322 name: String,
1323 entity_type: String,
1324 }
1325 #[derive(Deserialize)]
1326 struct RelItem {
1327 source: String,
1328 target: String,
1329 relation: String,
1330 strength: f64,
1331 }
1332
1333 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1334 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1335
1336 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1337 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1338
1339 let mut ent_count = 0usize;
1340 let mut rel_count = 0usize;
1341
1342 for item in &extracted_entities {
1343 let entity_type = match item.entity_type.parse::<EntityType>() {
1344 Ok(et) => et,
1345 Err(_) => {
1346 tracing::warn!(
1347 target: "enrich",
1348 entity = %item.name,
1349 entity_type = %item.entity_type,
1350 "entity type not recognized, skipping"
1351 );
1352 continue;
1353 }
1354 };
1355 match entities::upsert_entity(
1356 conn,
1357 namespace,
1358 &NewEntity {
1359 name: item.name.clone(),
1360 entity_type,
1361 description: None,
1362 },
1363 ) {
1364 Ok(eid) => {
1365 let _ = entities::link_memory_entity(conn, memory_id, eid);
1366 ent_count += 1;
1367 }
1368 Err(e) => {
1369 tracing::warn!(
1370 target: "enrich",
1371 entity = %item.name,
1372 error = %e,
1373 "entity upsert skipped"
1374 );
1375 }
1376 }
1377 }
1378
1379 for rel in &extracted_rels {
1380 let normalized = crate::parsers::normalize_relation(&rel.relation);
1381 crate::parsers::warn_if_non_canonical(&normalized);
1382
1383 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1386 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1387 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1388 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1389 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1390 let new_rel = NewRelationship {
1391 source: rel.source.clone(),
1392 target: rel.target.clone(),
1393 relation: normalized,
1394 strength: rel.strength,
1395 description: None,
1396 };
1397 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1398 rel_count += 1;
1399 }
1400 }
1401 }
1402
1403 Ok((ent_count, rel_count))
1404}
1405
1406fn persist_entity_description(
1408 conn: &Connection,
1409 entity_id: i64,
1410 description: &str,
1411) -> Result<(), AppError> {
1412 conn.execute(
1413 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1414 rusqlite::params![description, entity_id],
1415 )?;
1416 Ok(())
1417}
1418
1419#[allow(clippy::too_many_arguments)]
1425fn reembed_memory_vector(
1426 conn: &Connection,
1427 namespace: &str,
1428 memory_id: i64,
1429 memory_name: &str,
1430 memory_type: &str,
1431 body: &str,
1432 paths: &crate::paths::AppPaths,
1433 llm_backend: crate::cli::LlmBackendChoice,
1434) -> Result<(), AppError> {
1435 let snippet: String = body.chars().take(200).collect();
1436 let (embedding, backend_kind) =
1441 crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1442 record_enrich_backend(backend_kind.as_str());
1443 memories::upsert_vec(
1444 conn,
1445 memory_id,
1446 namespace,
1447 memory_type,
1448 &embedding,
1449 memory_name,
1450 &snippet,
1451 )?;
1452 Ok(())
1453}
1454
1455fn record_enrich_backend(backend: &'static str) {
1461 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1462 *guard = Some(backend);
1463 }
1464}
1465
1466fn take_enrich_backend() -> Option<&'static str> {
1467 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1468}
1469
1470static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1471
1472fn persist_enriched_body(
1477 conn: &Connection,
1478 namespace: &str,
1479 memory_id: i64,
1480 memory_name: &str,
1481 new_body: &str,
1482 paths: &crate::paths::AppPaths,
1483 llm_backend: crate::cli::LlmBackendChoice,
1484) -> Result<(), AppError> {
1485 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1487 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1488 rusqlite::params![memory_id],
1489 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1490 )?;
1491
1492 let memory_type: String = conn.query_row(
1493 "SELECT type FROM memories WHERE id=?1",
1494 rusqlite::params![memory_id],
1495 |r| r.get(0),
1496 )?;
1497
1498 let description: String = conn.query_row(
1499 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1500 rusqlite::params![memory_id],
1501 |r| r.get(0),
1502 )?;
1503
1504 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1505
1506 let new_memory = memories::NewMemory {
1507 namespace: namespace.to_string(),
1508 name: memory_name.to_string(),
1509 memory_type: memory_type.clone(),
1510 description: description.clone(),
1511 body: new_body.to_string(),
1512 body_hash,
1513 session_id: None,
1514 source: "agent".to_string(),
1515 metadata: serde_json::json!({
1516 "operation": "body-enrich",
1517 "orig_chars": old_body.chars().count(),
1518 "new_chars": new_body.chars().count(),
1519 }),
1520 };
1521
1522 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1526 let version_metadata = serde_json::json!({
1527 "operation": "body-enrich",
1528 "orig_chars": old_body.chars().count(),
1529 "new_chars": new_body.chars().count(),
1530 })
1531 .to_string();
1532 crate::storage::versions::insert_version(
1533 conn,
1534 memory_id,
1535 next_version,
1536 memory_name,
1537 &memory_type,
1538 &description,
1539 new_body,
1540 &version_metadata,
1541 Some("enrich"),
1542 "edit",
1543 )?;
1544
1545 memories::update(conn, memory_id, &new_memory, None)?;
1546 memories::sync_fts_after_update(
1547 conn,
1548 memory_id,
1549 &old_name,
1550 &old_desc,
1551 &old_body,
1552 &new_memory.name,
1553 &new_memory.description,
1554 &new_memory.body,
1555 )?;
1556
1557 if let Err(e) = reembed_memory_vector(
1559 conn,
1560 namespace,
1561 memory_id,
1562 memory_name,
1563 &memory_type,
1564 new_body,
1565 paths,
1566 llm_backend,
1567 ) {
1568 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1569 }
1570
1571 Ok(())
1572}
1573
1574fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1586 value == default
1587}
1588
1589fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1604 const DEFAULT_TIMEOUT: u64 = 300;
1605
1606 let mut conflicts: Vec<String> = Vec::new();
1607
1608 match args.mode {
1609 EnrichMode::ClaudeCode => {
1610 if args.codex_binary.is_some() {
1611 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1612 }
1613 if args.codex_model.is_some() {
1614 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1615 }
1616 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1617 conflicts.push(format!(
1618 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1619 args.codex_timeout
1620 ));
1621 }
1622 }
1623 EnrichMode::Codex => {
1624 if args.claude_binary.is_some() {
1625 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1626 }
1627 if args.claude_model.is_some() {
1628 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1629 }
1630 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1631 conflicts.push(format!(
1632 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1633 args.claude_timeout
1634 ));
1635 }
1636 if args.max_cost_usd.is_some() {
1637 conflicts.push(
1638 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1639 .to_string(),
1640 );
1641 }
1642 }
1643 EnrichMode::Opencode => {
1644 if args.claude_binary.is_some() {
1645 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1646 }
1647 if args.claude_model.is_some() {
1648 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1649 }
1650 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1651 conflicts.push(format!(
1652 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1653 args.claude_timeout
1654 ));
1655 }
1656 if args.max_cost_usd.is_some() {
1657 conflicts.push(
1658 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1659 .to_string(),
1660 );
1661 }
1662 }
1663 }
1664
1665 if !conflicts.is_empty() {
1666 return Err(AppError::Validation(format!(
1667 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1668 args.mode,
1669 conflicts.join("\n - ")
1670 )));
1671 }
1672
1673 Ok(())
1674}
1675
1676pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1680 validate_mode_conditional_flags_enrich(args)?;
1683 let started = Instant::now();
1684
1685 let paths = AppPaths::resolve(args.db.as_deref())?;
1686 ensure_db_ready(&paths)?;
1687 let conn = open_rw(&paths.db)?;
1688 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1689
1690 let wait_secs = args.wait_job_singleton;
1696 let force_flag = args.force_job_singleton;
1697 let _singleton = crate::lock::acquire_job_singleton(
1698 crate::lock::JobType::Enrich,
1699 &namespace,
1700 &paths.db,
1701 wait_secs,
1702 force_flag,
1703 )?;
1704
1705 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1707 None
1708 } else {
1709 Some(match args.mode {
1710 EnrichMode::ClaudeCode => {
1711 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1712 let version = super::claude_runner::validate_claude_version(&bin)?;
1713 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1714 emit_json(&PhaseEvent {
1715 phase: "validate",
1716 binary_path: bin.to_str(),
1717 version: Some(&version),
1718 items_total: None,
1719 items_pending: None,
1720 llm_parallelism: None,
1721 });
1722 bin
1723 }
1724 EnrichMode::Codex => {
1725 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1726 emit_json(&PhaseEvent {
1727 phase: "validate",
1728 binary_path: bin.to_str(),
1729 version: None,
1730 items_total: None,
1731 items_pending: None,
1732 llm_parallelism: None,
1733 });
1734 bin
1735 }
1736 EnrichMode::Opencode => {
1737 let bin = super::opencode_runner::find_opencode_binary_with_override(
1738 args.opencode_binary.as_deref(),
1739 )?;
1740 emit_json(&PhaseEvent {
1741 phase: "validate",
1742 binary_path: bin.to_str(),
1743 version: None,
1744 items_total: None,
1745 items_pending: None,
1746 llm_parallelism: None,
1747 });
1748 bin
1749 }
1750 })
1751 };
1752
1753 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1757 let load = crate::system_load::load_average_one();
1758 let n = crate::system_load::ncpus();
1759 return Err(AppError::Validation(format!(
1760 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1761 pass --no-max-load-check to override (not recommended)"
1762 )));
1763 }
1764
1765 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1772 {
1773 let preflight_result = run_preflight_probe(args);
1774 match preflight_result {
1775 PreflightOutcome::Healthy => {
1776 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1777 }
1778 PreflightOutcome::RateLimited { reason, suggestion } => {
1779 if let Some(fallback) = args.fallback_mode.clone() {
1780 if fallback != args.mode {
1781 return Err(AppError::Validation(format!(
1791 "preflight detected rate limit on {mode:?}: {reason}; \
1792 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1793 mode = args.mode
1794 )));
1795 }
1796 return Err(AppError::Validation(format!(
1797 "preflight detected rate limit on {mode:?}: {reason}; \
1798 --fallback-mode matches --mode, no recovery possible",
1799 mode = args.mode
1800 )));
1801 }
1802 return Err(AppError::Validation(format!(
1803 "preflight detected rate limit on {mode:?}: {reason}; \
1804 {suggestion}; pass --fallback-mode codex to recover",
1805 mode = args.mode
1806 )));
1807 }
1808 PreflightOutcome::Error(e) => {
1809 return Err(e);
1810 }
1811 }
1812 }
1813
1814 let scan_result = scan_operation(&conn, &namespace, args)?;
1816 let total = scan_result.len();
1817
1818 emit_json(&PhaseEvent {
1819 phase: "scan",
1820 binary_path: None,
1821 version: None,
1822 items_total: Some(total),
1823 items_pending: Some(total),
1824 llm_parallelism: Some(args.llm_parallelism),
1825 });
1826
1827 if args.dry_run {
1829 for (idx, key) in scan_result.iter().enumerate() {
1830 emit_json(&ItemEvent {
1831 item: key,
1832 status: "preview",
1833 memory_id: None,
1834 entity_id: None,
1835 entities: None,
1836 rels: None,
1837 chars_before: None,
1838 chars_after: None,
1839 cost_usd: None,
1840 elapsed_ms: None,
1841 error: None,
1842 index: idx,
1843 total,
1844 });
1845 }
1846 emit_json(&EnrichSummary {
1847 summary: true,
1848 operation: format!("{:?}", args.operation),
1849 items_total: total,
1850 completed: 0,
1851 failed: 0,
1852 skipped: 0,
1853 cost_usd: 0.0,
1854 elapsed_ms: started.elapsed().as_millis() as u64,
1855 backend_invoked: take_enrich_backend(),
1856 });
1857 return Ok(());
1858 }
1859
1860 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1864
1865 if args.resume {
1866 let reset = queue_conn
1867 .execute(
1868 "UPDATE queue SET status='pending' WHERE status='processing'",
1869 [],
1870 )
1871 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1872 if reset > 0 {
1873 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1874 }
1875 }
1876
1877 if args.retry_failed {
1878 let count = queue_conn
1879 .execute(
1880 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1881 [],
1882 )
1883 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1884 tracing::info!(target: "enrich", count, "retrying failed items");
1885 }
1886
1887 if !args.resume && !args.retry_failed {
1888 queue_conn
1889 .execute("DELETE FROM queue", [])
1890 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1891 }
1892
1893 for (idx, key) in scan_result.iter().enumerate() {
1895 let item_type = match args.operation {
1896 EnrichOperation::EntityDescriptions => "entity",
1897 _ => "memory",
1898 };
1899 if let Err(e) = queue_conn.execute(
1900 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1901 rusqlite::params![key, item_type],
1902 ) {
1903 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1904 }
1905 let _ = idx; }
1907
1908 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1911 if parallelism > 1 {
1912 tracing::info!(
1913 target: "enrich",
1914 llm_parallelism = parallelism,
1915 "parallel LLM processing with bounded thread pool"
1916 );
1917 }
1918 if parallelism > 4 {
1922 match args.mode {
1923 EnrichMode::ClaudeCode => {
1924 tracing::warn!(
1925 target: "enrich",
1926 llm_parallelism = parallelism,
1927 recommended_max = 4,
1928 mode = "claude-code",
1929 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1930 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1931 to cut MCP children (G28-A)"
1932 );
1933 }
1934 EnrichMode::Codex if parallelism > 16 => {
1935 tracing::warn!(
1936 target: "enrich",
1937 llm_parallelism = parallelism,
1938 recommended_max = 16,
1939 mode = "codex",
1940 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1941 consider --llm-parallelism 8 for safer concurrency"
1942 );
1943 }
1944 EnrichMode::Codex => {
1945 }
1949 EnrichMode::Opencode if parallelism > 16 => {
1950 tracing::warn!(
1951 target: "enrich",
1952 llm_parallelism = parallelism,
1953 recommended_max = 16,
1954 mode = "opencode",
1955 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1956 consider --llm-parallelism 8 for safer concurrency"
1957 );
1958 }
1959 EnrichMode::Opencode => {
1960 }
1962 }
1963 }
1964
1965 let mut completed = 0usize;
1966 let mut failed = 0usize;
1967 let mut skipped = 0usize;
1968 let mut cost_total = 0.0f64;
1969 let mut oauth_detected = false;
1970 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1971 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1972 let enrich_started = std::time::Instant::now();
1973
1974 let provider_timeout = match args.mode {
1975 EnrichMode::ClaudeCode => args.claude_timeout,
1976 EnrichMode::Codex => args.codex_timeout,
1977 EnrichMode::Opencode => args.opencode_timeout,
1978 };
1979
1980 let provider_model: Option<&str> = match args.mode {
1981 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1982 EnrichMode::Codex => args.codex_model.as_deref(),
1983 EnrichMode::Opencode => args.opencode_model.as_deref(),
1984 };
1985
1986 if parallelism > 1 {
1990 let stdout_mu = parking_lot::Mutex::new(());
1991 let budget = args.max_cost_usd;
1992 let operation = args.operation.clone();
1993 let mode = args.mode.clone();
1994 let min_oc = args.min_output_chars;
1995 let max_oc = args.max_output_chars;
1996 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1997
1998 struct WorkerResult {
1999 completed: usize,
2000 failed: usize,
2001 skipped: usize,
2002 cost: f64,
2003 oauth: bool,
2004 }
2005
2006 let results: Vec<WorkerResult> = std::thread::scope(|s| {
2007 let handles: Vec<_> = (0..parallelism)
2008 .map(|worker_id| {
2009 let stdout_mu = &stdout_mu;
2010 let paths = &paths;
2011 let namespace = &namespace;
2012 let provider_binary = provider_binary.as_deref();
2013 let operation = &operation;
2014 let mode = &mode;
2015 let prompt_tpl = prompt_tpl.as_deref();
2016 s.spawn(move || {
2017 let w_conn = match open_rw(&paths.db) {
2018 Ok(c) => c,
2019 Err(e) => {
2020 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2021 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2022 }
2023 };
2024 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2025 Ok(c) => c,
2026 Err(e) => {
2027 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2028 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2029 }
2030 };
2031 let mut w_completed = 0usize;
2032 let mut w_failed = 0usize;
2033 let mut w_skipped = 0usize;
2034 let mut w_cost = 0.0f64;
2035 let mut w_oauth = false;
2036 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2037 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2038 let mut w_breaker = crate::retry::CircuitBreaker::new(
2044 args.circuit_breaker_threshold.max(1),
2045 std::time::Duration::from_secs(60),
2046 );
2047
2048 loop {
2049 if crate::shutdown_requested() {
2050 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2051 break;
2052 }
2053 if let Some(b) = budget {
2054 if !w_oauth && w_cost >= b {
2055 break;
2056 }
2057 }
2058 let pending: Option<(i64, String, String)> = w_queue
2059 .query_row(
2060 "UPDATE queue SET status='processing', attempt=attempt+1 \
2061 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2062 RETURNING id, item_key, item_type",
2063 [],
2064 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2065 )
2066 .ok();
2067 let (queue_id, item_key, _item_type) = match pending {
2068 Some(p) => p,
2069 None => break,
2070 };
2071 let item_started = Instant::now();
2072 let current_index = w_completed + w_failed + w_skipped;
2073
2074 let call_result = match operation {
2075 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2076 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2077 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
2078 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
2079 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2080 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2081 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2082 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2083 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2084 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2085 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2086 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2087 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2088 };
2089
2090 match call_result {
2091 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2092 if is_oauth { w_oauth = true; }
2093 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2094 let _ = w_queue.execute(
2095 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2096 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2097 );
2098 w_completed += 1;
2099 if !is_oauth { w_cost += cost; }
2100 let _ = w_breaker
2102 .record(crate::retry::AttemptOutcome::Success);
2103 let _guard = stdout_mu.lock();
2104 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2105 }
2106 Ok(EnrichItemResult::Skipped { reason }) => {
2107 w_skipped += 1;
2108 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2109 let _guard = stdout_mu.lock();
2110 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2111 }
2112 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2113 w_skipped += 1;
2119 let reason = format!(
2120 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2121 );
2122 let _ = w_queue.execute(
2123 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2124 rusqlite::params![reason, queue_id],
2125 );
2126 let _guard = stdout_mu.lock();
2127 emit_json(&ItemEvent {
2128 item: &item_key,
2129 status: "preservation_failed",
2130 memory_id: None,
2131 entity_id: None,
2132 entities: None,
2133 rels: None,
2134 chars_before: Some(chars_before),
2135 chars_after: Some(chars_after),
2136 cost_usd: None,
2137 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2138 error: Some(reason),
2139 index: current_index,
2140 total,
2141 });
2142 }
2143 Err(e) => {
2144 let err_str = format!("{e}");
2145 if matches!(e, AppError::RateLimited { .. }) {
2146 if crate::retry::is_kill_switch_active() {
2147 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2148 } else if std::time::Instant::now() >= w_deadline {
2149 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2150 } else {
2151 let half = w_backoff / 2;
2152 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2153 let actual_wait = half + jitter;
2154 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2155 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2156 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2157 w_backoff = (w_backoff * 2).min(900);
2158 continue;
2159 }
2160 }
2161 w_failed += 1;
2162 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2163 let _guard = stdout_mu.lock();
2164 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2165 let breaker_opened = w_breaker
2167 .record(crate::retry::AttemptOutcome::HardFailure);
2168 if breaker_opened {
2169 tracing::error!(target: "enrich",
2170 consecutive_failures = w_breaker.consecutive_failures(),
2171 "circuit breaker opened — aborting worker"
2172 );
2173 break;
2174 }
2175 }
2176 }
2177 }
2178 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2179 })
2180 })
2181 .collect();
2182 handles
2183 .into_iter()
2184 .map(|h| {
2185 h.join().unwrap_or(WorkerResult {
2186 completed: 0,
2187 failed: 0,
2188 skipped: 0,
2189 cost: 0.0,
2190 oauth: false,
2191 })
2192 })
2193 .collect()
2194 });
2195
2196 for r in &results {
2197 completed += r.completed;
2198 failed += r.failed;
2199 skipped += r.skipped;
2200 cost_total += r.cost;
2201 if r.oauth && !oauth_detected {
2202 oauth_detected = true;
2203 }
2204 }
2205 } else {
2206 loop {
2208 if crate::shutdown_requested() {
2209 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2210 break;
2211 }
2212
2213 if let Some(budget) = args.max_cost_usd {
2215 if !oauth_detected && cost_total >= budget {
2216 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2217 break;
2218 }
2219 }
2220
2221 let pending: Option<(i64, String, String)> = queue_conn
2223 .query_row(
2224 "UPDATE queue SET status='processing', attempt=attempt+1 \
2225 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2226 RETURNING id, item_key, item_type",
2227 [],
2228 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2229 )
2230 .ok();
2231
2232 let (queue_id, item_key, item_type) = match pending {
2233 Some(p) => p,
2234 None => break,
2235 };
2236
2237 let item_started = Instant::now();
2238 let current_index = completed + failed + skipped;
2239
2240 let call_result = match args.operation {
2241 EnrichOperation::MemoryBindings => call_memory_bindings(
2242 &conn,
2243 &namespace,
2244 &item_key,
2245 provider_binary
2246 .as_deref()
2247 .expect("provider binary required"),
2248 provider_model,
2249 provider_timeout,
2250 &args.mode,
2251 ),
2252 EnrichOperation::EntityDescriptions => call_entity_description(
2253 &conn,
2254 &namespace,
2255 &item_key,
2256 provider_binary
2257 .as_deref()
2258 .expect("provider binary required"),
2259 provider_model,
2260 provider_timeout,
2261 &args.mode,
2262 ),
2263 EnrichOperation::BodyEnrich => call_body_enrich(
2264 &conn,
2265 &namespace,
2266 &item_key,
2267 provider_binary
2268 .as_deref()
2269 .expect("provider binary required"),
2270 provider_model,
2271 provider_timeout,
2272 &args.mode,
2273 args.min_output_chars,
2274 args.max_output_chars,
2275 args.prompt_template.as_deref(),
2276 args.preserve_threshold,
2277 &paths,
2278 llm_backend,
2279 ),
2280 EnrichOperation::ReEmbed => {
2281 call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2282 }
2283 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2284 &conn,
2285 &namespace,
2286 &item_key,
2287 provider_binary
2288 .as_deref()
2289 .expect("provider binary required"),
2290 provider_model,
2291 provider_timeout,
2292 &args.mode,
2293 ),
2294 EnrichOperation::RelationReclassify => call_relation_reclassify(
2295 &conn,
2296 &namespace,
2297 &item_key,
2298 provider_binary
2299 .as_deref()
2300 .expect("provider binary required"),
2301 provider_model,
2302 provider_timeout,
2303 &args.mode,
2304 ),
2305 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2306 call_entity_connect(
2307 &conn,
2308 &namespace,
2309 &item_key,
2310 provider_binary
2311 .as_deref()
2312 .expect("provider binary required"),
2313 provider_model,
2314 provider_timeout,
2315 &args.mode,
2316 )
2317 }
2318 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2319 &conn,
2320 &namespace,
2321 &item_key,
2322 provider_binary
2323 .as_deref()
2324 .expect("provider binary required"),
2325 provider_model,
2326 provider_timeout,
2327 &args.mode,
2328 ),
2329 EnrichOperation::DescriptionEnrich => call_description_enrich(
2330 &conn,
2331 &namespace,
2332 &item_key,
2333 provider_binary
2334 .as_deref()
2335 .expect("provider binary required"),
2336 provider_model,
2337 provider_timeout,
2338 &args.mode,
2339 ),
2340 EnrichOperation::DomainClassify => call_domain_classify(
2341 &conn,
2342 &namespace,
2343 &item_key,
2344 provider_binary
2345 .as_deref()
2346 .expect("provider binary required"),
2347 provider_model,
2348 provider_timeout,
2349 &args.mode,
2350 ),
2351 EnrichOperation::GraphAudit => call_graph_audit(
2352 &conn,
2353 &namespace,
2354 &item_key,
2355 provider_binary
2356 .as_deref()
2357 .expect("provider binary required"),
2358 provider_model,
2359 provider_timeout,
2360 &args.mode,
2361 ),
2362 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2363 &conn,
2364 &namespace,
2365 &item_key,
2366 provider_binary
2367 .as_deref()
2368 .expect("provider binary required"),
2369 provider_model,
2370 provider_timeout,
2371 &args.mode,
2372 ),
2373 EnrichOperation::BodyExtract => call_body_extract(
2374 &conn,
2375 &namespace,
2376 &item_key,
2377 provider_binary
2378 .as_deref()
2379 .expect("provider binary required"),
2380 provider_model,
2381 provider_timeout,
2382 &args.mode,
2383 ),
2384 };
2385
2386 match call_result {
2387 Ok(EnrichItemResult::Done {
2388 memory_id,
2389 entity_id,
2390 entities,
2391 rels,
2392 chars_before,
2393 chars_after,
2394 cost,
2395 is_oauth,
2396 }) => {
2397 if is_oauth && !oauth_detected {
2398 oauth_detected = true;
2399 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2400 }
2401 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2402
2403 let persist_err: Option<String> = match args.operation {
2405 EnrichOperation::MemoryBindings => {
2406 None
2408 }
2409 EnrichOperation::EntityDescriptions => {
2410 None
2412 }
2413 EnrichOperation::BodyEnrich => {
2414 None
2416 }
2417 _ => {
2418 None
2420 }
2421 };
2422
2423 if let Err(e) = queue_conn.execute(
2424 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2425 rusqlite::params![
2426 memory_id,
2427 entity_id,
2428 entities as i64,
2429 rels as i64,
2430 cost,
2431 item_started.elapsed().as_millis() as i64,
2432 queue_id
2433 ],
2434 ) {
2435 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2436 }
2437
2438 if persist_err.is_none() {
2439 completed += 1;
2440 if !is_oauth {
2441 cost_total += cost;
2442 }
2443 emit_json(&ItemEvent {
2444 item: &item_key,
2445 status: "done",
2446 memory_id,
2447 entity_id,
2448 entities: Some(entities),
2449 rels: Some(rels),
2450 chars_before,
2451 chars_after,
2452 cost_usd: if is_oauth { None } else { Some(cost) },
2453 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2454 error: None,
2455 index: current_index,
2456 total,
2457 });
2458 } else {
2459 failed += 1;
2460 emit_json(&ItemEvent {
2461 item: &item_key,
2462 status: "failed",
2463 memory_id: None,
2464 entity_id: None,
2465 entities: None,
2466 rels: None,
2467 chars_before: None,
2468 chars_after: None,
2469 cost_usd: None,
2470 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2471 error: persist_err,
2472 index: current_index,
2473 total,
2474 });
2475 }
2476 }
2477 Ok(EnrichItemResult::Skipped { reason }) => {
2478 skipped += 1;
2479 if let Err(e) = queue_conn.execute(
2480 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2481 rusqlite::params![reason, queue_id],
2482 ) {
2483 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2484 }
2485 emit_json(&ItemEvent {
2486 item: &item_key,
2487 status: "skipped",
2488 memory_id: None,
2489 entity_id: None,
2490 entities: None,
2491 rels: None,
2492 chars_before: None,
2493 chars_after: None,
2494 cost_usd: None,
2495 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2496 error: None,
2497 index: current_index,
2498 total,
2499 });
2500 }
2501 Ok(EnrichItemResult::PreservationFailed {
2502 score,
2503 threshold,
2504 chars_before,
2505 chars_after,
2506 }) => {
2507 skipped += 1;
2514 let reason = format!(
2515 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2516 );
2517 if let Err(qe) = queue_conn.execute(
2518 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2519 rusqlite::params![reason, queue_id],
2520 ) {
2521 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2522 }
2523 emit_json(&ItemEvent {
2524 item: &item_key,
2525 status: "preservation_failed",
2526 memory_id: None,
2527 entity_id: None,
2528 entities: None,
2529 rels: None,
2530 chars_before: Some(chars_before),
2531 chars_after: Some(chars_after),
2532 cost_usd: None,
2533 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2534 error: Some(reason),
2535 index: current_index,
2536 total,
2537 });
2538 }
2539 Err(e) => {
2540 let err_str = format!("{e}");
2541 if matches!(e, AppError::RateLimited { .. }) {
2542 if crate::retry::is_kill_switch_active() {
2543 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2544 } else if std::time::Instant::now() >= rate_limit_deadline {
2545 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2546 } else {
2547 let half = backoff_secs / 2;
2548 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2549 let actual_wait = half + jitter;
2550 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2551 if let Err(qe) = queue_conn.execute(
2552 "UPDATE queue SET status='pending' WHERE id=?1",
2553 rusqlite::params![queue_id],
2554 ) {
2555 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2556 }
2557 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2558 backoff_secs = (backoff_secs * 2).min(900);
2559 continue;
2560 }
2561 }
2562
2563 failed += 1;
2564 if let Err(qe) = queue_conn.execute(
2565 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2566 rusqlite::params![err_str, queue_id],
2567 ) {
2568 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2569 }
2570 emit_json(&ItemEvent {
2571 item: &item_key,
2572 status: "failed",
2573 memory_id: None,
2574 entity_id: None,
2575 entities: None,
2576 rels: None,
2577 chars_before: None,
2578 chars_after: None,
2579 cost_usd: None,
2580 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2581 error: Some(err_str),
2582 index: current_index,
2583 total,
2584 });
2585 }
2586 }
2587
2588 let _ = item_type; }
2590 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2593 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2594
2595 emit_json(&EnrichSummary {
2596 summary: true,
2597 operation: format!("{:?}", args.operation),
2598 items_total: total,
2599 completed,
2600 failed,
2601 skipped,
2602 cost_usd: cost_total,
2603 elapsed_ms: started.elapsed().as_millis() as u64,
2604 backend_invoked: take_enrich_backend(),
2605 });
2606
2607 if failed == 0 {
2608 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2609 }
2610
2611 Ok(())
2612}
2613
2614enum EnrichItemResult {
2619 Done {
2620 memory_id: Option<i64>,
2621 entity_id: Option<i64>,
2622 entities: usize,
2623 rels: usize,
2624 chars_before: Option<usize>,
2625 chars_after: Option<usize>,
2626 cost: f64,
2627 is_oauth: bool,
2628 },
2629 Skipped {
2630 reason: String,
2631 },
2632 PreservationFailed {
2637 score: f64,
2638 threshold: f64,
2639 chars_before: usize,
2640 chars_after: usize,
2641 },
2642}
2643
2644fn call_memory_bindings(
2649 conn: &Connection,
2650 namespace: &str,
2651 memory_name: &str,
2652 binary: &Path,
2653 model: Option<&str>,
2654 timeout: u64,
2655 mode: &EnrichMode,
2656) -> Result<EnrichItemResult, AppError> {
2657 let (memory_id, body): (i64, String) = conn.query_row(
2659 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2660 rusqlite::params![namespace, memory_name],
2661 |r| Ok((r.get(0)?, r.get(1)?)),
2662 ).map_err(|e| match e {
2663 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2664 other => AppError::Database(other),
2665 })?;
2666
2667 if body.trim().is_empty() {
2668 return Ok(EnrichItemResult::Skipped {
2669 reason: "body is empty".to_string(),
2670 });
2671 }
2672
2673 let (value, cost, is_oauth) = match mode {
2674 EnrichMode::ClaudeCode => call_claude(
2675 binary,
2676 BINDINGS_PROMPT,
2677 BINDINGS_SCHEMA,
2678 &body,
2679 model,
2680 timeout,
2681 )?,
2682 EnrichMode::Codex => call_codex(
2683 binary,
2684 BINDINGS_PROMPT,
2685 BINDINGS_SCHEMA,
2686 &body,
2687 model,
2688 timeout,
2689 )?,
2690 EnrichMode::Opencode => call_opencode(
2691 binary,
2692 BINDINGS_PROMPT,
2693 BINDINGS_SCHEMA,
2694 &body,
2695 model,
2696 timeout,
2697 )?,
2698 };
2699
2700 let empty_arr = serde_json::Value::Array(vec![]);
2701 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2702 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2703
2704 let (ent_count, rel_count) =
2705 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2706
2707 Ok(EnrichItemResult::Done {
2708 memory_id: Some(memory_id),
2709 entity_id: None,
2710 entities: ent_count,
2711 rels: rel_count,
2712 chars_before: None,
2713 chars_after: None,
2714 cost,
2715 is_oauth,
2716 })
2717}
2718
2719fn call_entity_description(
2720 conn: &Connection,
2721 namespace: &str,
2722 entity_name: &str,
2723 binary: &Path,
2724 model: Option<&str>,
2725 timeout: u64,
2726 mode: &EnrichMode,
2727) -> Result<EnrichItemResult, AppError> {
2728 let (entity_id, entity_type): (i64, String) = conn
2729 .query_row(
2730 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2731 rusqlite::params![namespace, entity_name],
2732 |r| Ok((r.get(0)?, r.get(1)?)),
2733 )
2734 .map_err(|e| match e {
2735 rusqlite::Error::QueryReturnedNoRows => {
2736 AppError::NotFound(format!("entity '{entity_name}' not found"))
2737 }
2738 other => AppError::Database(other),
2739 })?;
2740
2741 let prompt = format!(
2742 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2743 );
2744
2745 let (value, cost, is_oauth) = match mode {
2746 EnrichMode::ClaudeCode => call_claude(
2747 binary,
2748 &prompt,
2749 ENTITY_DESCRIPTION_SCHEMA,
2750 "",
2751 model,
2752 timeout,
2753 )?,
2754 EnrichMode::Codex => call_codex(
2755 binary,
2756 &prompt,
2757 ENTITY_DESCRIPTION_SCHEMA,
2758 "",
2759 model,
2760 timeout,
2761 )?,
2762 EnrichMode::Opencode => call_opencode(
2763 binary,
2764 &prompt,
2765 ENTITY_DESCRIPTION_SCHEMA,
2766 "",
2767 model,
2768 timeout,
2769 )?,
2770 };
2771
2772 let description = value
2773 .get("description")
2774 .and_then(|v| v.as_str())
2775 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2776
2777 persist_entity_description(conn, entity_id, description)?;
2778
2779 Ok(EnrichItemResult::Done {
2780 memory_id: None,
2781 entity_id: Some(entity_id),
2782 entities: 0,
2783 rels: 0,
2784 chars_before: None,
2785 chars_after: None,
2786 cost,
2787 is_oauth,
2788 })
2789}
2790
2791#[allow(clippy::too_many_arguments)]
2792fn call_body_enrich(
2793 conn: &Connection,
2794 namespace: &str,
2795 memory_name: &str,
2796 binary: &Path,
2797 model: Option<&str>,
2798 timeout: u64,
2799 mode: &EnrichMode,
2800 min_output_chars: usize,
2801 max_output_chars: usize,
2802 prompt_template: Option<&Path>,
2803 preserve_threshold: f64,
2804 paths: &crate::paths::AppPaths,
2805 llm_backend: crate::cli::LlmBackendChoice,
2806) -> Result<EnrichItemResult, AppError> {
2807 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2808 .query_row(
2809 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2810 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2811 rusqlite::params![namespace, memory_name],
2812 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2813 )
2814 .map_err(|e| match e {
2815 rusqlite::Error::QueryReturnedNoRows => {
2816 AppError::NotFound(format!("memory '{memory_name}' not found"))
2817 }
2818 other => AppError::Database(other),
2819 })?;
2820
2821 let chars_before = body.chars().count();
2822
2823 let linked_entities: Vec<String> = {
2825 let mut stmt = conn.prepare_cached(
2826 "SELECT e.name FROM memory_entities me \
2827 JOIN entities e ON e.id = me.entity_id \
2828 WHERE me.memory_id = ?1 LIMIT 10",
2829 )?;
2830 let result: Vec<String> = stmt
2831 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2832 .filter_map(|r| r.ok())
2833 .collect();
2834 drop(stmt);
2835 result
2836 };
2837
2838 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2840 let file_size = std::fs::metadata(tmpl_path)
2841 .map_err(|e| {
2842 AppError::Io(std::io::Error::new(
2843 e.kind(),
2844 format!("failed to stat prompt template: {e}"),
2845 ))
2846 })?
2847 .len();
2848 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2849 return Err(AppError::LimitExceeded(
2850 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2851 ));
2852 }
2853 std::fs::read_to_string(tmpl_path).map_err(|e| {
2854 AppError::Io(std::io::Error::new(
2855 e.kind(),
2856 format!("failed to read prompt template: {e}"),
2857 ))
2858 })?
2859 } else {
2860 BODY_ENRICH_PROMPT_PREFIX.to_string()
2861 };
2862
2863 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2865 let mut ctx = String::new();
2866 ctx.push_str(&format!(
2867 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2868 ));
2869 if !description.is_empty() {
2870 ctx.push_str(&format!("- Description: {description}\n"));
2871 }
2872 ctx.push_str(&format!("- Domain: {namespace}\n"));
2873 if !linked_entities.is_empty() {
2874 ctx.push_str(&format!(
2875 "- Linked entities: {}\n",
2876 linked_entities.join(", ")
2877 ));
2878 }
2879 ctx
2880 } else {
2881 String::new()
2882 };
2883
2884 let prompt = format!(
2885 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2886 );
2887
2888 let (value, cost, is_oauth) = match mode {
2890 EnrichMode::ClaudeCode => {
2891 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2892 }
2893 EnrichMode::Codex => {
2894 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2895 }
2896 EnrichMode::Opencode => {
2897 call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2898 }
2899 };
2900
2901 let enriched_body = value
2902 .get("enriched_body")
2903 .and_then(|v| v.as_str())
2904 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2905
2906 let chars_after = enriched_body.chars().count();
2907
2908 let threshold = preserve_threshold;
2915 let verdict =
2916 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2917 if !verdict.is_accepted() {
2918 return Ok(EnrichItemResult::PreservationFailed {
2919 score: match verdict {
2920 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2921 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2922 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2923 },
2924 threshold,
2925 chars_before,
2926 chars_after,
2927 });
2928 }
2929
2930 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2936 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2937 if old_hash == new_hash {
2938 return Ok(EnrichItemResult::Skipped {
2939 reason: format!(
2940 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2941 ),
2942 });
2943 }
2944
2945 if chars_after <= chars_before {
2947 return Ok(EnrichItemResult::Skipped {
2948 reason: format!(
2949 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2950 ),
2951 });
2952 }
2953
2954 persist_enriched_body(
2955 conn,
2956 namespace,
2957 memory_id,
2958 memory_name,
2959 enriched_body,
2960 paths,
2961 llm_backend,
2962 )?;
2963
2964 Ok(EnrichItemResult::Done {
2965 memory_id: Some(memory_id),
2966 entity_id: None,
2967 entities: 0,
2968 rels: 0,
2969 chars_before: Some(chars_before),
2970 chars_after: Some(chars_after),
2971 cost,
2972 is_oauth,
2973 })
2974}
2975
2976fn call_reembed(
2977 conn: &Connection,
2978 namespace: &str,
2979 memory_name: &str,
2980 paths: &crate::paths::AppPaths,
2981 llm_backend: crate::cli::LlmBackendChoice,
2982) -> Result<EnrichItemResult, AppError> {
2983 let (memory_id, body, memory_type): (i64, String, String) = conn
2984 .query_row(
2985 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2986 FROM memories
2987 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2988 rusqlite::params![namespace, memory_name],
2989 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2990 )
2991 .map_err(|e| match e {
2992 rusqlite::Error::QueryReturnedNoRows => {
2993 AppError::NotFound(format!("memory '{memory_name}' not found"))
2994 }
2995 other => AppError::Database(other),
2996 })?;
2997
2998 if body.trim().is_empty() {
2999 return Ok(EnrichItemResult::Skipped {
3000 reason: "body is empty".to_string(),
3001 });
3002 }
3003
3004 reembed_memory_vector(
3005 conn,
3006 namespace,
3007 memory_id,
3008 memory_name,
3009 &memory_type,
3010 &body,
3011 paths,
3012 llm_backend,
3013 )?;
3014
3015 Ok(EnrichItemResult::Done {
3016 memory_id: Some(memory_id),
3017 entity_id: None,
3018 entities: 0,
3019 rels: 0,
3020 chars_before: Some(body.chars().count()),
3021 chars_after: Some(body.chars().count()),
3022 cost: 0.0,
3023 is_oauth: true,
3024 })
3025}
3026
3027fn scan_operation(
3032 conn: &Connection,
3033 namespace: &str,
3034 args: &EnrichArgs,
3035) -> Result<Vec<String>, AppError> {
3036 let name_filter = resolve_name_filter(args)?;
3038 match args.operation {
3039 EnrichOperation::MemoryBindings => {
3040 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3041 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3042 }
3043 EnrichOperation::EntityDescriptions => {
3044 let rows =
3045 scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3046 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3047 }
3048 EnrichOperation::BodyEnrich => {
3049 let rows = scan_short_body_memories(
3050 conn,
3051 namespace,
3052 args.min_output_chars,
3053 args.limit,
3054 &name_filter,
3055 )?;
3056 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3057 }
3058 EnrichOperation::ReEmbed => {
3059 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3060 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3061 }
3062 EnrichOperation::WeightCalibrate => {
3063 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3064 Ok(rows
3065 .into_iter()
3066 .map(|(id, _, _, _, _)| id.to_string())
3067 .collect())
3068 }
3069 EnrichOperation::RelationReclassify => {
3070 let rows = scan_generic_relations(conn, namespace, args.limit)?;
3071 Ok(rows
3072 .into_iter()
3073 .map(|(id, _, _, _)| id.to_string())
3074 .collect())
3075 }
3076 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3077 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3078 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3079 }
3080 EnrichOperation::EntityTypeValidate => {
3081 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3082 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3083 }
3084 EnrichOperation::DescriptionEnrich => {
3085 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3086 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3087 }
3088 EnrichOperation::DomainClassify
3089 | EnrichOperation::GraphAudit
3090 | EnrichOperation::DeepResearchSynth
3091 | EnrichOperation::BodyExtract => {
3092 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3093 let sql = format!(
3094 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3095 );
3096 let mut stmt = conn.prepare(&sql)?;
3097 let names = stmt
3098 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3099 .collect::<Result<Vec<_>, _>>()?;
3100 Ok(names)
3101 }
3102 }
3103}
3104
3105fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3111 if let Some(p) = explicit {
3112 if p.exists() {
3113 return Ok(p.to_path_buf());
3114 }
3115 return Err(AppError::Validation(format!(
3116 "Codex binary not found at explicit path: {}",
3117 p.display()
3118 )));
3119 }
3120
3121 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3122 let p = PathBuf::from(&env_path);
3123 if p.exists() {
3124 return Ok(p);
3125 }
3126 }
3127
3128 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3129 if let Some(path_var) = std::env::var_os("PATH") {
3130 for dir in std::env::split_paths(&path_var) {
3131 let candidate = dir.join(name);
3132 if candidate.exists() {
3133 return Ok(crate::extract::llm_embedding::resolve_real_binary(
3134 &candidate,
3135 ));
3136 }
3137 }
3138 }
3139
3140 Err(AppError::Validation(
3141 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3142 ))
3143}
3144
3145fn call_weight_calibrate(
3147 conn: &Connection,
3148 _namespace: &str,
3149 item_key: &str,
3150 binary: &Path,
3151 model: Option<&str>,
3152 timeout: u64,
3153 mode: &EnrichMode,
3154) -> Result<EnrichItemResult, AppError> {
3155 let rel_id: i64 = item_key
3156 .parse()
3157 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3158 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3159 .query_row(
3160 "SELECT e1.name, e2.name, r.relation, r.weight \
3161 FROM relationships r \
3162 JOIN entities e1 ON e1.id = r.source_id \
3163 JOIN entities e2 ON e2.id = r.target_id \
3164 WHERE r.id = ?1",
3165 rusqlite::params![rel_id],
3166 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3167 )
3168 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3169
3170 let input_text = format!(
3171 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3172 );
3173 let (value, cost, is_oauth) = match mode {
3174 EnrichMode::ClaudeCode => call_claude(
3175 binary,
3176 WEIGHT_CALIBRATE_PROMPT,
3177 WEIGHT_CALIBRATE_SCHEMA,
3178 &input_text,
3179 model,
3180 timeout,
3181 )?,
3182 EnrichMode::Codex => call_codex(
3183 binary,
3184 WEIGHT_CALIBRATE_PROMPT,
3185 WEIGHT_CALIBRATE_SCHEMA,
3186 &input_text,
3187 model,
3188 timeout,
3189 )?,
3190 EnrichMode::Opencode => call_opencode(
3191 binary,
3192 WEIGHT_CALIBRATE_PROMPT,
3193 WEIGHT_CALIBRATE_SCHEMA,
3194 &input_text,
3195 model,
3196 timeout,
3197 )?,
3198 };
3199
3200 let calibrated = value
3201 .get("calibrated_weight")
3202 .and_then(|v| v.as_f64())
3203 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3204
3205 conn.execute(
3206 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3207 rusqlite::params![calibrated, rel_id],
3208 )?;
3209
3210 Ok(EnrichItemResult::Done {
3211 memory_id: None,
3212 entity_id: None,
3213 entities: 0,
3214 rels: 1,
3215 chars_before: None,
3216 chars_after: None,
3217 cost,
3218 is_oauth,
3219 })
3220}
3221
3222fn call_relation_reclassify(
3224 conn: &Connection,
3225 _namespace: &str,
3226 item_key: &str,
3227 binary: &Path,
3228 model: Option<&str>,
3229 timeout: u64,
3230 mode: &EnrichMode,
3231) -> Result<EnrichItemResult, AppError> {
3232 let rel_id: i64 = item_key
3233 .parse()
3234 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3235 let (source_name, target_name, current_relation): (String, String, String) = conn
3236 .query_row(
3237 "SELECT e1.name, e2.name, r.relation \
3238 FROM relationships r \
3239 JOIN entities e1 ON e1.id = r.source_id \
3240 JOIN entities e2 ON e2.id = r.target_id \
3241 WHERE r.id = ?1",
3242 rusqlite::params![rel_id],
3243 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3244 )
3245 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3246
3247 let input_text = format!(
3248 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3249 );
3250 let (value, cost, is_oauth) = match mode {
3251 EnrichMode::ClaudeCode => call_claude(
3252 binary,
3253 RELATION_RECLASSIFY_PROMPT,
3254 RELATION_RECLASSIFY_SCHEMA,
3255 &input_text,
3256 model,
3257 timeout,
3258 )?,
3259 EnrichMode::Codex => call_codex(
3260 binary,
3261 RELATION_RECLASSIFY_PROMPT,
3262 RELATION_RECLASSIFY_SCHEMA,
3263 &input_text,
3264 model,
3265 timeout,
3266 )?,
3267 EnrichMode::Opencode => call_opencode(
3268 binary,
3269 RELATION_RECLASSIFY_PROMPT,
3270 RELATION_RECLASSIFY_SCHEMA,
3271 &input_text,
3272 model,
3273 timeout,
3274 )?,
3275 };
3276
3277 let new_relation = value
3278 .get("relation")
3279 .and_then(|v| v.as_str())
3280 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3281 let new_strength = value
3282 .get("strength")
3283 .and_then(|v| v.as_f64())
3284 .unwrap_or(0.5);
3285
3286 conn.execute(
3287 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3288 rusqlite::params![new_relation, new_strength, rel_id],
3289 )?;
3290
3291 Ok(EnrichItemResult::Done {
3292 memory_id: None,
3293 entity_id: None,
3294 entities: 0,
3295 rels: 1,
3296 chars_before: None,
3297 chars_after: None,
3298 cost,
3299 is_oauth,
3300 })
3301}
3302
3303fn call_entity_connect(
3305 conn: &Connection,
3306 namespace: &str,
3307 item_key: &str,
3308 binary: &Path,
3309 model: Option<&str>,
3310 timeout: u64,
3311 mode: &EnrichMode,
3312) -> Result<EnrichItemResult, AppError> {
3313 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3314 let (e1_id, e1_name, e2_id, e2_name) =
3315 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3316 Some(p) => p,
3317 None => {
3318 return Ok(EnrichItemResult::Skipped {
3319 reason: "pair no longer isolated".into(),
3320 })
3321 }
3322 };
3323 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3324 let (value, cost, is_oauth) = match mode {
3325 EnrichMode::ClaudeCode => call_claude(
3326 binary,
3327 ENTITY_CONNECT_PROMPT,
3328 ENTITY_CONNECT_SCHEMA,
3329 &input_text,
3330 model,
3331 timeout,
3332 )?,
3333 EnrichMode::Codex => call_codex(
3334 binary,
3335 ENTITY_CONNECT_PROMPT,
3336 ENTITY_CONNECT_SCHEMA,
3337 &input_text,
3338 model,
3339 timeout,
3340 )?,
3341 EnrichMode::Opencode => call_opencode(
3342 binary,
3343 ENTITY_CONNECT_PROMPT,
3344 ENTITY_CONNECT_SCHEMA,
3345 &input_text,
3346 model,
3347 timeout,
3348 )?,
3349 };
3350 let relation = value
3351 .get("relation")
3352 .and_then(|v| v.as_str())
3353 .unwrap_or("none");
3354 if relation == "none" {
3355 return Ok(EnrichItemResult::Skipped {
3356 reason: "LLM determined no relationship".into(),
3357 });
3358 }
3359 let strength = value
3360 .get("strength")
3361 .and_then(|v| v.as_f64())
3362 .unwrap_or(0.5);
3363 conn.execute(
3364 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3365 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3366 )?;
3367 Ok(EnrichItemResult::Done {
3368 memory_id: None,
3369 entity_id: None,
3370 entities: 0,
3371 rels: 1,
3372 chars_before: None,
3373 chars_after: None,
3374 cost,
3375 is_oauth,
3376 })
3377}
3378
3379fn call_entity_type_validate(
3381 conn: &Connection,
3382 _namespace: &str,
3383 item_key: &str,
3384 binary: &Path,
3385 model: Option<&str>,
3386 timeout: u64,
3387 mode: &EnrichMode,
3388) -> Result<EnrichItemResult, AppError> {
3389 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3390 .query_row(
3391 "SELECT id, name, type FROM entities WHERE name = ?1",
3392 rusqlite::params![item_key],
3393 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3394 )
3395 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3396 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3397 let (value, cost, is_oauth) = match mode {
3398 EnrichMode::ClaudeCode => call_claude(
3399 binary,
3400 ENTITY_TYPE_VALIDATE_PROMPT,
3401 ENTITY_TYPE_VALIDATE_SCHEMA,
3402 &input_text,
3403 model,
3404 timeout,
3405 )?,
3406 EnrichMode::Codex => call_codex(
3407 binary,
3408 ENTITY_TYPE_VALIDATE_PROMPT,
3409 ENTITY_TYPE_VALIDATE_SCHEMA,
3410 &input_text,
3411 model,
3412 timeout,
3413 )?,
3414 EnrichMode::Opencode => call_opencode(
3415 binary,
3416 ENTITY_TYPE_VALIDATE_PROMPT,
3417 ENTITY_TYPE_VALIDATE_SCHEMA,
3418 &input_text,
3419 model,
3420 timeout,
3421 )?,
3422 };
3423 let validated_type = value
3424 .get("validated_type")
3425 .and_then(|v| v.as_str())
3426 .unwrap_or(&ent_type);
3427 let was_correct = value
3428 .get("was_correct")
3429 .and_then(|v| v.as_bool())
3430 .unwrap_or(true);
3431 if !was_correct {
3432 conn.execute(
3433 "UPDATE entities SET type = ?1 WHERE id = ?2",
3434 rusqlite::params![validated_type, ent_id],
3435 )?;
3436 }
3437 Ok(EnrichItemResult::Done {
3438 memory_id: None,
3439 entity_id: Some(ent_id),
3440 entities: 1,
3441 rels: 0,
3442 chars_before: None,
3443 chars_after: None,
3444 cost,
3445 is_oauth,
3446 })
3447}
3448
3449fn call_description_enrich(
3451 conn: &Connection,
3452 _namespace: &str,
3453 item_key: &str,
3454 binary: &Path,
3455 model: Option<&str>,
3456 timeout: u64,
3457 mode: &EnrichMode,
3458) -> Result<EnrichItemResult, AppError> {
3459 let (mem_id, body, old_desc): (i64, String, String) = conn
3460 .query_row(
3461 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3462 rusqlite::params![item_key],
3463 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3464 )
3465 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3466 let snippet: String = body.chars().take(500).collect();
3467 let input_text = format!(
3468 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3469 );
3470 let (value, cost, is_oauth) = match mode {
3471 EnrichMode::ClaudeCode => call_claude(
3472 binary,
3473 DESCRIPTION_ENRICH_PROMPT,
3474 DESCRIPTION_ENRICH_SCHEMA,
3475 &input_text,
3476 model,
3477 timeout,
3478 )?,
3479 EnrichMode::Codex => call_codex(
3480 binary,
3481 DESCRIPTION_ENRICH_PROMPT,
3482 DESCRIPTION_ENRICH_SCHEMA,
3483 &input_text,
3484 model,
3485 timeout,
3486 )?,
3487 EnrichMode::Opencode => call_opencode(
3488 binary,
3489 DESCRIPTION_ENRICH_PROMPT,
3490 DESCRIPTION_ENRICH_SCHEMA,
3491 &input_text,
3492 model,
3493 timeout,
3494 )?,
3495 };
3496 let new_desc = value
3497 .get("description")
3498 .and_then(|v| v.as_str())
3499 .unwrap_or(&old_desc);
3500 let old_name: String = conn.query_row(
3501 "SELECT name FROM memories WHERE id = ?1",
3502 rusqlite::params![mem_id],
3503 |r| r.get(0),
3504 )?;
3505 conn.execute(
3506 "UPDATE memories SET description = ?1 WHERE id = ?2",
3507 rusqlite::params![new_desc, mem_id],
3508 )?;
3509 memories::sync_fts_after_update(
3510 conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3511 )?;
3512 Ok(EnrichItemResult::Done {
3513 memory_id: Some(mem_id),
3514 entity_id: None,
3515 entities: 0,
3516 rels: 0,
3517 chars_before: Some(old_desc.len()),
3518 chars_after: Some(new_desc.len()),
3519 cost,
3520 is_oauth,
3521 })
3522}
3523
3524fn call_domain_classify(
3526 conn: &Connection,
3527 _namespace: &str,
3528 item_key: &str,
3529 binary: &Path,
3530 model: Option<&str>,
3531 timeout: u64,
3532 mode: &EnrichMode,
3533) -> Result<EnrichItemResult, AppError> {
3534 let (mem_id, body, desc): (i64, String, String) = conn
3535 .query_row(
3536 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3537 rusqlite::params![item_key],
3538 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3539 )
3540 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3541 let snippet: String = body.chars().take(500).collect();
3542 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3543 let (value, cost, is_oauth) = match mode {
3544 EnrichMode::ClaudeCode => call_claude(
3545 binary,
3546 DOMAIN_CLASSIFY_PROMPT,
3547 DOMAIN_CLASSIFY_SCHEMA,
3548 &input_text,
3549 model,
3550 timeout,
3551 )?,
3552 EnrichMode::Codex => call_codex(
3553 binary,
3554 DOMAIN_CLASSIFY_PROMPT,
3555 DOMAIN_CLASSIFY_SCHEMA,
3556 &input_text,
3557 model,
3558 timeout,
3559 )?,
3560 EnrichMode::Opencode => call_opencode(
3561 binary,
3562 DOMAIN_CLASSIFY_PROMPT,
3563 DOMAIN_CLASSIFY_SCHEMA,
3564 &input_text,
3565 model,
3566 timeout,
3567 )?,
3568 };
3569 let domain = value
3570 .get("domain")
3571 .and_then(|v| v.as_str())
3572 .unwrap_or("uncategorized");
3573 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3574 conn.execute(
3575 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3576 rusqlite::params![metadata, mem_id],
3577 )?;
3578 Ok(EnrichItemResult::Done {
3579 memory_id: Some(mem_id),
3580 entity_id: None,
3581 entities: 0,
3582 rels: 0,
3583 chars_before: None,
3584 chars_after: None,
3585 cost,
3586 is_oauth,
3587 })
3588}
3589
3590fn call_graph_audit(
3592 conn: &Connection,
3593 _namespace: &str,
3594 item_key: &str,
3595 binary: &Path,
3596 model: Option<&str>,
3597 timeout: u64,
3598 mode: &EnrichMode,
3599) -> Result<EnrichItemResult, AppError> {
3600 let (mem_id, body, desc): (i64, String, String) = conn
3601 .query_row(
3602 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3603 rusqlite::params![item_key],
3604 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3605 )
3606 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3607 let snippet: String = body.chars().take(500).collect();
3608 let ent_count: i64 = conn
3609 .query_row(
3610 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3611 rusqlite::params![mem_id],
3612 |r| r.get(0),
3613 )
3614 .unwrap_or(0);
3615 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3616 let (value, cost, is_oauth) = match mode {
3617 EnrichMode::ClaudeCode => call_claude(
3618 binary,
3619 GRAPH_AUDIT_PROMPT,
3620 GRAPH_AUDIT_SCHEMA,
3621 &input_text,
3622 model,
3623 timeout,
3624 )?,
3625 EnrichMode::Codex => call_codex(
3626 binary,
3627 GRAPH_AUDIT_PROMPT,
3628 GRAPH_AUDIT_SCHEMA,
3629 &input_text,
3630 model,
3631 timeout,
3632 )?,
3633 EnrichMode::Opencode => call_opencode(
3634 binary,
3635 GRAPH_AUDIT_PROMPT,
3636 GRAPH_AUDIT_SCHEMA,
3637 &input_text,
3638 model,
3639 timeout,
3640 )?,
3641 };
3642 let issues = value
3643 .get("issues")
3644 .and_then(|v| v.as_array())
3645 .map(|a| a.len())
3646 .unwrap_or(0);
3647 Ok(EnrichItemResult::Done {
3648 memory_id: Some(mem_id),
3649 entity_id: None,
3650 entities: 0,
3651 rels: issues,
3652 chars_before: None,
3653 chars_after: None,
3654 cost,
3655 is_oauth,
3656 })
3657}
3658
3659fn call_deep_research_synth(
3661 conn: &Connection,
3662 namespace: &str,
3663 item_key: &str,
3664 binary: &Path,
3665 model: Option<&str>,
3666 timeout: u64,
3667 mode: &EnrichMode,
3668) -> Result<EnrichItemResult, AppError> {
3669 let (mem_id, body): (i64, String) = conn
3670 .query_row(
3671 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3672 rusqlite::params![item_key],
3673 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3674 )
3675 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3676 let snippet: String = body.chars().take(2000).collect();
3677 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3678 let (value, cost, is_oauth) = match mode {
3679 EnrichMode::ClaudeCode => call_claude(
3680 binary,
3681 DEEP_RESEARCH_SYNTH_PROMPT,
3682 DEEP_RESEARCH_SYNTH_SCHEMA,
3683 &input_text,
3684 model,
3685 timeout,
3686 )?,
3687 EnrichMode::Codex => call_codex(
3688 binary,
3689 DEEP_RESEARCH_SYNTH_PROMPT,
3690 DEEP_RESEARCH_SYNTH_SCHEMA,
3691 &input_text,
3692 model,
3693 timeout,
3694 )?,
3695 EnrichMode::Opencode => call_opencode(
3696 binary,
3697 DEEP_RESEARCH_SYNTH_PROMPT,
3698 DEEP_RESEARCH_SYNTH_SCHEMA,
3699 &input_text,
3700 model,
3701 timeout,
3702 )?,
3703 };
3704 let mut ent_count = 0usize;
3705 let mut rel_count = 0usize;
3706 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3707 for e in ents {
3708 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3709 let etype_str = e
3710 .get("entity_type")
3711 .and_then(|v| v.as_str())
3712 .unwrap_or("concept");
3713 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3714 if name.len() >= 2 {
3715 let ne = NewEntity {
3716 name: name.to_string(),
3717 entity_type: etype,
3718 description: None,
3719 };
3720 let _ = entities::upsert_entity(conn, namespace, &ne);
3721 ent_count += 1;
3722 }
3723 }
3724 }
3725 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3726 for r in rels {
3727 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3728 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3729 if src.is_empty() || tgt.is_empty() {
3730 continue;
3731 }
3732 let rel = r
3733 .get("relation")
3734 .and_then(|v| v.as_str())
3735 .unwrap_or("related");
3736 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3737 if let (Some(sid), Some(tid)) = (
3738 entities::find_entity_id(conn, namespace, src)?,
3739 entities::find_entity_id(conn, namespace, tgt)?,
3740 ) {
3741 let _ = entities::create_or_fetch_relationship(
3742 conn, namespace, sid, tid, rel, str_, None,
3743 );
3744 rel_count += 1;
3745 }
3746 }
3747 }
3748 Ok(EnrichItemResult::Done {
3749 memory_id: Some(mem_id),
3750 entity_id: None,
3751 entities: ent_count,
3752 rels: rel_count,
3753 chars_before: None,
3754 chars_after: None,
3755 cost,
3756 is_oauth,
3757 })
3758}
3759
3760fn call_body_extract(
3762 conn: &Connection,
3763 _namespace: &str,
3764 item_key: &str,
3765 binary: &Path,
3766 model: Option<&str>,
3767 timeout: u64,
3768 mode: &EnrichMode,
3769) -> Result<EnrichItemResult, AppError> {
3770 let (mem_id, body, old_desc): (i64, String, String) = conn
3771 .query_row(
3772 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3773 rusqlite::params![item_key],
3774 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3775 )
3776 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3777 let old_name: String = conn.query_row(
3778 "SELECT name FROM memories WHERE id = ?1",
3779 rusqlite::params![mem_id],
3780 |r| r.get(0),
3781 )?;
3782 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3783 let (value, cost, is_oauth) = match mode {
3784 EnrichMode::ClaudeCode => call_claude(
3785 binary,
3786 BODY_EXTRACT_PROMPT,
3787 BODY_EXTRACT_SCHEMA,
3788 &input_text,
3789 model,
3790 timeout,
3791 )?,
3792 EnrichMode::Codex => call_codex(
3793 binary,
3794 BODY_EXTRACT_PROMPT,
3795 BODY_EXTRACT_SCHEMA,
3796 &input_text,
3797 model,
3798 timeout,
3799 )?,
3800 EnrichMode::Opencode => call_opencode(
3801 binary,
3802 BODY_EXTRACT_PROMPT,
3803 BODY_EXTRACT_SCHEMA,
3804 &input_text,
3805 model,
3806 timeout,
3807 )?,
3808 };
3809 let restructured = value
3810 .get("restructured_body")
3811 .and_then(|v| v.as_str())
3812 .unwrap_or(&body);
3813 let chars_before = body.len();
3814 let chars_after = restructured.len();
3815 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3816 conn.execute(
3817 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3818 rusqlite::params![restructured, new_hash, mem_id],
3819 )?;
3820 memories::sync_fts_after_update(
3821 conn,
3822 mem_id,
3823 &old_name,
3824 &old_desc,
3825 &body,
3826 &old_name,
3827 &old_desc,
3828 restructured,
3829 )?;
3830 Ok(EnrichItemResult::Done {
3831 memory_id: Some(mem_id),
3832 entity_id: None,
3833 entities: 0,
3834 rels: 0,
3835 chars_before: Some(chars_before),
3836 chars_after: Some(chars_after),
3837 cost,
3838 is_oauth,
3839 })
3840}
3841
3842#[allow(clippy::type_complexity)]
3844fn scan_isolated_entity_pairs(
3845 conn: &Connection,
3846 namespace: &str,
3847 limit: Option<usize>,
3848) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3849 let limit_val = limit.unwrap_or(50) as i64;
3850 let mut stmt = conn.prepare_cached(
3851 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3852 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3853 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3854 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3855 (r.source_id = e2.id AND r.target_id = e1.id)) \
3856 LIMIT ?2",
3857 )?;
3858 let rows = stmt
3859 .query_map(rusqlite::params![namespace, limit_val], |r| {
3860 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3861 })?
3862 .collect::<Result<Vec<_>, _>>()?;
3863 Ok(rows)
3864}
3865
3866fn scan_entities_for_type_validation(
3868 conn: &Connection,
3869 namespace: &str,
3870 limit: Option<usize>,
3871) -> Result<Vec<(i64, String, String)>, AppError> {
3872 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3873 let sql = format!(
3874 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3875 );
3876 let mut stmt = conn.prepare(&sql)?;
3877 let rows = stmt
3878 .query_map(rusqlite::params![namespace], |r| {
3879 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3880 })?
3881 .collect::<Result<Vec<_>, _>>()?;
3882 Ok(rows)
3883}
3884
3885fn scan_generic_descriptions(
3887 conn: &Connection,
3888 namespace: &str,
3889 limit: Option<usize>,
3890) -> Result<Vec<(i64, String, String)>, AppError> {
3891 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3892 let sql = format!(
3893 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3894 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3895 ORDER BY id {limit_clause}"
3896 );
3897 let mut stmt = conn.prepare(&sql)?;
3898 let rows = stmt
3899 .query_map(rusqlite::params![namespace], |r| {
3900 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3901 })?
3902 .collect::<Result<Vec<_>, _>>()?;
3903 Ok(rows)
3904}
3905
3906fn call_codex(
3910 binary: &Path,
3911 prompt: &str,
3912 json_schema: &str,
3913 input_text: &str,
3914 model: Option<&str>,
3915 timeout_secs: u64,
3916) -> Result<(serde_json::Value, f64, bool), AppError> {
3917 use wait_timeout::ChildExt;
3918
3919 super::codex_spawn::validate_codex_model(model)?;
3924 let schema_file = super::codex_spawn::trusted_schema_path()?;
3925
3926 let args = super::codex_spawn::CodexSpawnArgs {
3927 binary,
3928 prompt,
3929 json_schema,
3930 input_text,
3931 model,
3932 timeout_secs,
3933 schema_path: schema_file.clone(),
3934 };
3935 let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3936
3937 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3938 AppError::Io(std::io::Error::new(
3939 e.kind(),
3940 format!("failed to spawn codex: {e}"),
3941 ))
3942 })?;
3943
3944 let full_prompt = format!("{prompt}\n\n{input_text}");
3945 let stdin_bytes = full_prompt.into_bytes();
3946 let mut child_stdin = child
3947 .stdin
3948 .take()
3949 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3950 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3951 child_stdin.write_all(&stdin_bytes)?;
3952 drop(child_stdin);
3953 Ok(())
3954 });
3955
3956 let start = std::time::Instant::now();
3957 let timeout = std::time::Duration::from_secs(timeout_secs);
3958 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3959 let _ = std::fs::remove_file(&schema_file);
3960
3961 match status {
3962 Some(exit_status) => {
3963 stdin_thread
3964 .join()
3965 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3966 .map_err(AppError::Io)?;
3967
3968 tracing::debug!(
3969 target: "process",
3970 exit_code = ?exit_status.code(),
3971 elapsed_ms = start.elapsed().as_millis() as u64,
3972 "external process completed"
3973 );
3974
3975 let mut stdout_buf = Vec::new();
3976 if let Some(mut out) = child.stdout.take() {
3977 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3978 }
3979 if !exit_status.success() {
3980 let mut stderr_buf = Vec::new();
3981 if let Some(mut err) = child.stderr.take() {
3982 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3983 }
3984 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3985 tracing::warn!(
3986 target: "enrich",
3987 exit_code = ?exit_status.code(),
3988 stderr = %stderr_str.trim(),
3989 "codex process failed"
3990 );
3991 return Err(AppError::Validation(format!(
3992 "codex exited with code {:?}: {}",
3993 exit_status.code(),
3994 stderr_str.trim()
3995 )));
3996 }
3997 let stdout_str = String::from_utf8(stdout_buf)
3998 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3999 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4002 let value: serde_json::Value =
4008 serde_json::from_str(&result.last_agent_text).map_err(|e| {
4009 AppError::Validation(format!(
4010 "codex agent_message is not valid JSON: {e}; raw={}",
4011 result.last_agent_text
4012 ))
4013 })?;
4014 Ok((value, 0.0, false))
4015 }
4016 None => {
4017 let _ = child.kill();
4018 let _ = child.wait();
4019 let _ = stdin_thread.join();
4020 Err(AppError::Validation(format!(
4021 "codex timed out after {timeout_secs} seconds"
4022 )))
4023 }
4024 }
4025}
4026
4027fn call_opencode(
4028 binary: &Path,
4029 prompt: &str,
4030 json_schema: &str,
4031 input_text: &str,
4032 model: Option<&str>,
4033 timeout_secs: u64,
4034) -> Result<(serde_json::Value, f64, bool), AppError> {
4035 use wait_timeout::ChildExt;
4036
4037 let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4038
4039 let augmented_prompt = if json_schema.is_empty() {
4040 prompt.to_string()
4041 } else {
4042 format!(
4043 "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4044 The JSON MUST match this schema:\n{json_schema}"
4045 )
4046 };
4047
4048 let mut cmd = super::opencode_runner::build_opencode_command_sync(
4049 binary,
4050 &resolved_model,
4051 &augmented_prompt,
4052 input_text,
4053 );
4054
4055 let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4056 AppError::Io(std::io::Error::new(
4057 e.kind(),
4058 format!("failed to spawn opencode: {e}"),
4059 ))
4060 })?;
4061
4062 let start = std::time::Instant::now();
4063 let timeout = std::time::Duration::from_secs(timeout_secs);
4064 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4065
4066 match status {
4067 Some(exit_status) => {
4068 tracing::debug!(
4069 target: "process",
4070 exit_code = ?exit_status.code(),
4071 elapsed_ms = start.elapsed().as_millis() as u64,
4072 "opencode process completed"
4073 );
4074
4075 let mut stdout_buf = Vec::new();
4076 if let Some(mut out) = child.stdout.take() {
4077 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4078 }
4079 if !exit_status.success() {
4080 let mut stderr_buf = Vec::new();
4081 if let Some(mut err) = child.stderr.take() {
4082 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4083 }
4084 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4085 tracing::warn!(
4086 target: "enrich",
4087 exit_code = ?exit_status.code(),
4088 stderr = %stderr_str.trim(),
4089 "opencode process failed"
4090 );
4091 return Err(AppError::Validation(format!(
4092 "opencode exited with code {:?}: {}",
4093 exit_status.code(),
4094 stderr_str.trim()
4095 )));
4096 }
4097 let stdout_str = String::from_utf8(stdout_buf)
4098 .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4099 let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4100 let value: serde_json::Value =
4101 super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4102 AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4103 })?;
4104 Ok((value, cost, false))
4105 }
4106 None => {
4107 let _ = child.kill();
4108 let _ = child.wait();
4109 Err(AppError::Validation(format!(
4110 "opencode timed out after {timeout_secs} seconds"
4111 )))
4112 }
4113 }
4114}
4115
4116#[cfg(test)]
4121mod tests {
4122 use super::*;
4123 use rusqlite::Connection;
4124 #[cfg(unix)]
4125 use std::os::unix::fs::PermissionsExt;
4126
4127 fn open_test_db() -> Connection {
4129 let conn = Connection::open_in_memory().expect("in-memory db");
4130 conn.execute_batch(
4131 "CREATE TABLE memories (
4132 id INTEGER PRIMARY KEY AUTOINCREMENT,
4133 namespace TEXT NOT NULL DEFAULT 'global',
4134 name TEXT NOT NULL,
4135 type TEXT NOT NULL DEFAULT 'note',
4136 description TEXT NOT NULL DEFAULT '',
4137 body TEXT NOT NULL DEFAULT '',
4138 body_hash TEXT NOT NULL DEFAULT '',
4139 session_id TEXT,
4140 source TEXT NOT NULL DEFAULT 'agent',
4141 metadata TEXT NOT NULL DEFAULT '{}',
4142 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4143 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4144 deleted_at INTEGER,
4145 UNIQUE(namespace, name)
4146 );
4147 CREATE TABLE entities (
4148 id INTEGER PRIMARY KEY AUTOINCREMENT,
4149 namespace TEXT NOT NULL DEFAULT 'global',
4150 name TEXT NOT NULL,
4151 type TEXT NOT NULL DEFAULT 'concept',
4152 description TEXT,
4153 degree INTEGER NOT NULL DEFAULT 0,
4154 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4155 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4156 UNIQUE(namespace, name)
4157 );
4158 CREATE TABLE memory_entities (
4159 memory_id INTEGER NOT NULL,
4160 entity_id INTEGER NOT NULL,
4161 PRIMARY KEY (memory_id, entity_id)
4162 );
4163 CREATE TABLE relationships (
4164 id INTEGER PRIMARY KEY AUTOINCREMENT,
4165 namespace TEXT NOT NULL DEFAULT 'global',
4166 source_id INTEGER NOT NULL,
4167 target_id INTEGER NOT NULL,
4168 relation TEXT NOT NULL,
4169 weight REAL NOT NULL DEFAULT 0.5,
4170 description TEXT,
4171 UNIQUE(source_id, target_id, relation)
4172 );
4173 CREATE TABLE memory_embeddings (
4174 memory_id INTEGER PRIMARY KEY,
4175 namespace TEXT NOT NULL,
4176 embedding BLOB NOT NULL,
4177 source TEXT NOT NULL,
4178 model TEXT NOT NULL DEFAULT '',
4179 dim INTEGER NOT NULL DEFAULT 384,
4180 created_at INTEGER NOT NULL DEFAULT (unixepoch())
4181 );",
4182 )
4183 .expect("schema creation must succeed");
4184 conn
4185 }
4186
4187 #[test]
4188 fn scan_unbound_memories_finds_memories_without_bindings() {
4189 let conn = open_test_db();
4190 conn.execute(
4191 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4192 [],
4193 )
4194 .unwrap();
4195
4196 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4197 assert_eq!(results.len(), 1);
4198 assert_eq!(results[0].1, "test-mem");
4199 }
4200
4201 #[test]
4202 fn scan_unbound_memories_excludes_bound_memories() {
4203 let conn = open_test_db();
4204 conn.execute(
4205 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4206 [],
4207 )
4208 .unwrap();
4209 let mem_id: i64 = conn
4210 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4211 r.get(0)
4212 })
4213 .unwrap();
4214 conn.execute(
4215 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4216 [],
4217 )
4218 .unwrap();
4219 let ent_id: i64 = conn
4220 .query_row(
4221 "SELECT id FROM entities WHERE name='some-entity'",
4222 [],
4223 |r| r.get(0),
4224 )
4225 .unwrap();
4226 conn.execute(
4227 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4228 rusqlite::params![mem_id, ent_id],
4229 )
4230 .unwrap();
4231
4232 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4233 assert!(results.is_empty(), "bound memory must not appear in scan");
4234 }
4235
4236 #[test]
4237 fn scan_entities_without_description_finds_null_description() {
4238 let conn = open_test_db();
4239 conn.execute(
4240 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4241 [],
4242 )
4243 .unwrap();
4244
4245 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4246 assert_eq!(results.len(), 1);
4247 assert_eq!(results[0].1, "my-tool");
4248 }
4249
4250 #[test]
4251 fn scan_entities_without_description_excludes_entities_with_description() {
4252 let conn = open_test_db();
4253 conn.execute(
4254 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4255 [],
4256 )
4257 .unwrap();
4258
4259 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4260 assert!(
4261 results.is_empty(),
4262 "entity with description must not appear"
4263 );
4264 }
4265
4266 #[test]
4267 fn scan_short_body_memories_finds_short_bodies() {
4268 let conn = open_test_db();
4269 conn.execute(
4270 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4271 [],
4272 )
4273 .unwrap();
4274
4275 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4276 assert_eq!(results.len(), 1);
4277 assert_eq!(results[0].1, "short-mem");
4278 }
4279
4280 #[test]
4281 fn scan_short_body_memories_excludes_long_bodies() {
4282 let conn = open_test_db();
4283 let long_body = "a".repeat(1000);
4284 conn.execute(
4285 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4286 rusqlite::params![long_body],
4287 )
4288 .unwrap();
4289
4290 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4291 assert!(results.is_empty(), "long memory must not appear in scan");
4292 }
4293
4294 #[test]
4295 fn scan_respects_limit() {
4296 let conn = open_test_db();
4297 for i in 0..5 {
4298 conn.execute(
4299 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4300 [],
4301 )
4302 .unwrap();
4303 }
4304
4305 let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4306 assert_eq!(results.len(), 3, "limit must be respected");
4307 }
4308
4309 #[test]
4310 fn scan_memories_without_embeddings_finds_only_missing_rows() {
4311 let conn = open_test_db();
4312 conn.execute(
4313 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4314 [],
4315 )
4316 .unwrap();
4317 conn.execute(
4318 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4319 [],
4320 )
4321 .unwrap();
4322 let memory_id: i64 = conn
4323 .query_row(
4324 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4325 [],
4326 |r| r.get(0),
4327 )
4328 .unwrap();
4329 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4330 memories::upsert_vec(
4331 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4332 )
4333 .unwrap();
4334
4335 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4336 assert_eq!(results.len(), 1);
4337 assert_eq!(results[0].1, "missing-vec");
4338 }
4339
4340 #[test]
4341 fn scan_memories_without_embeddings_respects_name_filter() {
4342 let conn = open_test_db();
4343 conn.execute(
4344 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4345 [],
4346 )
4347 .unwrap();
4348 conn.execute(
4349 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4350 [],
4351 )
4352 .unwrap();
4353
4354 let results =
4355 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4356 .unwrap();
4357 assert_eq!(results.len(), 1);
4358 assert_eq!(results[0].1, "match-me");
4359 }
4360
4361 #[test]
4362 fn queue_db_schema_creates_correctly() {
4363 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4364 let conn = open_queue_db(&tmp_path).expect("queue db must open");
4365 let count: i64 = conn
4366 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4367 .unwrap();
4368 assert_eq!(count, 0);
4369 let _ = std::fs::remove_file(&tmp_path);
4370 }
4371
4372 #[test]
4373 fn parse_claude_output_valid_bindings() {
4374 let output = r#"[
4375 {"type":"system","subtype":"init"},
4376 {"type":"result","is_error":false,"total_cost_usd":0.01,
4377 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4378 ]"#;
4379 let result = crate::commands::claude_runner::parse_claude_output(output)
4380 .expect("must parse successfully");
4381 assert!(result.value.get("entities").is_some());
4382 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4383 assert!(!result.is_oauth);
4384 }
4385
4386 #[test]
4387 fn parse_claude_output_detects_oauth() {
4388 let output = r#"[
4389 {"type":"system","subtype":"init","apiKeySource":"none"},
4390 {"type":"result","is_error":false,"total_cost_usd":0.0,
4391 "structured_output":{"entities":[],"relationships":[]}}
4392 ]"#;
4393 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4394 assert!(result.is_oauth);
4395 }
4396
4397 #[test]
4398 fn parse_claude_output_rate_limit_returns_error() {
4399 let output = r#"[
4400 {"type":"system","subtype":"init"},
4401 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4402 ]"#;
4403 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4404 assert!(matches!(err, AppError::RateLimited { .. }));
4405 }
4406
4407 #[test]
4408 fn parse_claude_output_auth_error() {
4409 let output = r#"[
4410 {"type":"system","subtype":"init"},
4411 {"type":"result","is_error":true,"error":"authentication failed"}
4412 ]"#;
4413 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4414 assert!(format!("{err}").contains("authentication failed"));
4415 }
4416
4417 #[cfg(unix)]
4418 #[test]
4419 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4420 let tmp = tempfile::tempdir().expect("tempdir");
4421 let binary = tmp.path().join("codex-mock");
4422 std::fs::write(
4423 &binary,
4424 r#"#!/usr/bin/env bash
4425set -euo pipefail
4426cat <<'JSONL'
4427{"type":"thread.started","thread_id":"mock-thread-0"}
4428{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4429{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4430JSONL
4431"#,
4432 )
4433 .expect("mock codex write");
4434 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4435 perms.set_mode(0o755);
4436 std::fs::set_permissions(&binary, perms).expect("chmod");
4437
4438 let (value, cost, is_oauth) =
4439 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4440 .expect("call_codex must accept body-enrich payload");
4441
4442 assert_eq!(value["enriched_body"], "expanded body");
4443 assert_eq!(cost, 0.0);
4444 assert!(!is_oauth);
4445 }
4446
4447 #[test]
4448 fn dry_run_emits_preview_without_calling_llm() {
4449 let conn = open_test_db();
4454 conn.execute(
4455 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4456 [],
4457 )
4458 .unwrap();
4459
4460 let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4461 assert_eq!(results.len(), 1);
4462 assert_eq!(results[0].1, "dry-mem");
4463 }
4466
4467 #[test]
4468 fn persist_entity_description_updates_db() {
4469 let conn = open_test_db();
4470 conn.execute(
4471 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4472 [],
4473 )
4474 .unwrap();
4475 let eid: i64 = conn
4476 .query_row(
4477 "SELECT id FROM entities WHERE name='tokio-runtime'",
4478 [],
4479 |r| r.get(0),
4480 )
4481 .unwrap();
4482
4483 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4484
4485 let desc: String = conn
4486 .query_row(
4487 "SELECT description FROM entities WHERE id=?1",
4488 rusqlite::params![eid],
4489 |r| r.get(0),
4490 )
4491 .unwrap();
4492 assert_eq!(desc, "Async runtime for Rust applications");
4493 }
4494
4495 #[test]
4496 fn bindings_schema_is_valid_json() {
4497 let _: serde_json::Value =
4498 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4499 }
4500
4501 #[test]
4502 fn entity_description_schema_is_valid_json() {
4503 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4504 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4505 }
4506
4507 #[test]
4508 fn body_enrich_schema_is_valid_json() {
4509 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4510 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4511 }
4512}