1use crate::commands::ingest_claude::find_claude_binary;
24use crate::entity_type::EntityType;
25use crate::errors::AppError;
26use crate::paths::AppPaths;
27use crate::storage::connection::{ensure_db_ready, open_rw};
28use crate::storage::entities::{self, NewEntity, NewRelationship};
29use crate::storage::memories;
30
31use rusqlite::Connection;
32use serde::{Deserialize, Serialize};
33use std::io::Write;
34use std::path::{Path, PathBuf};
35use std::process::{Command, Stdio};
36use std::time::Instant;
37
38const MIN_CLAUDE_VERSION: &str = "2.1.0";
43const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48const BINDINGS_SCHEMA: &str = r#"{
53 "type": "object",
54 "properties": {
55 "entities": {
56 "type": "array",
57 "items": {
58 "type": "object",
59 "properties": {
60 "name": { "type": "string" },
61 "entity_type": {
62 "type": "string",
63 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64 }
65 },
66 "required": ["name", "entity_type"],
67 "additionalProperties": false
68 }
69 },
70 "relationships": {
71 "type": "array",
72 "items": {
73 "type": "object",
74 "properties": {
75 "source": { "type": "string" },
76 "target": { "type": "string" },
77 "relation": {
78 "type": "string",
79 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80 },
81 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82 },
83 "required": ["source","target","relation","strength"],
84 "additionalProperties": false
85 }
86 }
87 },
88 "required": ["entities","relationships"],
89 "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93 "type": "object",
94 "properties": {
95 "description": { "type": "string" }
96 },
97 "required": ["description"],
98 "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102 "type": "object",
103 "properties": {
104 "enriched_body": { "type": "string" }
105 },
106 "required": ["enriched_body"],
107 "additionalProperties": false
108}"#;
109
110const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
1151. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
1162. Typed relationships between entities with strength scores\n\n\
117Rules:\n\
118- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
119- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
120- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
121- NEVER use 'mentions' as relationship type\n\
122- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
123- Prefer fewer high-quality entities over many low-quality ones";
124
125const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
126
127const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
128
129#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
135#[serde(rename_all = "kebab-case")]
136pub enum EnrichOperation {
137 MemoryBindings,
139 EntityDescriptions,
141 BodyEnrich,
143 WeightCalibrate,
145 RelationReclassify,
147 EntityConnect,
149 EntityTypeValidate,
151 DescriptionEnrich,
153 CrossDomainBridges,
155 DomainClassify,
157 GraphAudit,
159 DeepResearchSynth,
161 BodyExtract,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
167pub enum EnrichMode {
168 ClaudeCode,
170 Codex,
172}
173
174impl std::fmt::Display for EnrichMode {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 match self {
177 EnrichMode::ClaudeCode => write!(f, "claude-code"),
178 EnrichMode::Codex => write!(f, "codex"),
179 }
180 }
181}
182
183#[derive(clap::Args)]
185#[command(
186 about = "Enrich graph memories and entities using an LLM provider",
187 after_long_help = "EXAMPLES:\n \
188 # Add missing entity bindings to all unbound memories\n \
189 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
190 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
191 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
192 # Expand short memory bodies (GAP-18)\n \
193 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
194 # Resume an interrupted body-enrich run\n \
195 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
196 # Retry only failed items from a previous run\n \
197 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
198 EXIT CODES:\n \
199 0 success\n \
200 1 validation error (bad args, binary not found)\n \
201 14 I/O error"
202)]
203pub struct EnrichArgs {
204 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
206 pub operation: EnrichOperation,
207
208 #[arg(long, value_enum, default_value = "claude-code")]
210 pub mode: EnrichMode,
211
212 #[arg(long, value_name = "N")]
214 pub limit: Option<usize>,
215
216 #[arg(long)]
218 pub dry_run: bool,
219
220 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
222 pub namespace: Option<String>,
223
224 #[arg(long, value_name = "PATH")]
227 pub claude_binary: Option<PathBuf>,
228
229 #[arg(long, value_name = "MODEL")]
231 pub claude_model: Option<String>,
232
233 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
235 pub claude_timeout: u64,
236
237 #[arg(long, value_name = "PATH")]
240 pub codex_binary: Option<PathBuf>,
241
242 #[arg(long, value_name = "MODEL")]
244 pub codex_model: Option<String>,
245
246 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
248 pub codex_timeout: u64,
249
250 #[arg(long, value_name = "USD")]
253 pub max_cost_usd: Option<f64>,
254
255 #[arg(long)]
258 pub resume: bool,
259
260 #[arg(long)]
262 pub retry_failed: bool,
263
264 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
267 pub min_output_chars: usize,
268
269 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
271 pub max_output_chars: usize,
272
273 #[arg(long, default_value_t = true)]
275 pub preserve_check: bool,
276
277 #[arg(long, value_name = "PATH")]
279 pub prompt_template: Option<PathBuf>,
280
281 #[arg(long)]
284 pub json: bool,
285
286 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
288 pub db: Option<String>,
289}
290
291#[derive(Debug, Deserialize)]
296struct ClaudeElement {
297 r#type: Option<String>,
298 subtype: Option<String>,
299 #[serde(default)]
300 is_error: bool,
301 structured_output: Option<serde_json::Value>,
302 result: Option<String>,
303 total_cost_usd: Option<f64>,
304 error: Option<String>,
305 #[serde(rename = "apiKeySource")]
306 api_key_source: Option<String>,
307}
308
309#[derive(Debug, Serialize)]
314struct PhaseEvent<'a> {
315 phase: &'a str,
316 #[serde(skip_serializing_if = "Option::is_none")]
317 binary_path: Option<&'a str>,
318 #[serde(skip_serializing_if = "Option::is_none")]
319 version: Option<&'a str>,
320 #[serde(skip_serializing_if = "Option::is_none")]
321 items_total: Option<usize>,
322 #[serde(skip_serializing_if = "Option::is_none")]
323 items_pending: Option<usize>,
324}
325
326#[derive(Debug, Serialize)]
327struct ItemEvent<'a> {
328 item: &'a str,
330 status: &'a str,
331 #[serde(skip_serializing_if = "Option::is_none")]
332 memory_id: Option<i64>,
333 #[serde(skip_serializing_if = "Option::is_none")]
334 entity_id: Option<i64>,
335 #[serde(skip_serializing_if = "Option::is_none")]
336 entities: Option<usize>,
337 #[serde(skip_serializing_if = "Option::is_none")]
338 rels: Option<usize>,
339 #[serde(skip_serializing_if = "Option::is_none")]
340 chars_before: Option<usize>,
341 #[serde(skip_serializing_if = "Option::is_none")]
342 chars_after: Option<usize>,
343 #[serde(skip_serializing_if = "Option::is_none")]
344 cost_usd: Option<f64>,
345 #[serde(skip_serializing_if = "Option::is_none")]
346 elapsed_ms: Option<u64>,
347 #[serde(skip_serializing_if = "Option::is_none")]
348 error: Option<String>,
349 index: usize,
350 total: usize,
351}
352
353#[derive(Debug, Serialize)]
354struct EnrichSummary {
355 summary: bool,
356 operation: String,
357 items_total: usize,
358 completed: usize,
359 failed: usize,
360 skipped: usize,
361 cost_usd: f64,
362 elapsed_ms: u64,
363}
364
365fn emit_json<T: Serialize>(value: &T) {
370 if let Ok(json) = serde_json::to_string(value) {
371 let stdout = std::io::stdout();
372 let mut lock = stdout.lock();
373 let _ = writeln!(lock, "{json}");
374 let _ = lock.flush();
375 }
376}
377
378fn open_queue_db(path: &str) -> Result<Connection, AppError> {
393 let conn = Connection::open(path)?;
394 conn.pragma_update(None, "journal_mode", "wal")?;
395 conn.execute_batch(
396 "CREATE TABLE IF NOT EXISTS queue (
397 id INTEGER PRIMARY KEY AUTOINCREMENT,
398 item_key TEXT NOT NULL UNIQUE,
399 item_type TEXT NOT NULL DEFAULT 'memory',
400 status TEXT NOT NULL DEFAULT 'pending',
401 memory_id INTEGER,
402 entity_id INTEGER,
403 entities INTEGER DEFAULT 0,
404 rels INTEGER DEFAULT 0,
405 error TEXT,
406 cost_usd REAL DEFAULT 0.0,
407 attempt INTEGER DEFAULT 0,
408 elapsed_ms INTEGER,
409 created_at TEXT DEFAULT (datetime('now')),
410 done_at TEXT
411 );
412 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
413 )?;
414 Ok(conn)
415}
416
417fn validate_claude_version_local(binary: &Path) -> Result<String, AppError> {
422 let output = Command::new(binary)
423 .arg("--version")
424 .stdin(Stdio::null())
425 .stdout(Stdio::piped())
426 .stderr(Stdio::piped())
427 .output()
428 .map_err(AppError::Io)?;
429
430 if !output.status.success() {
431 return Err(AppError::Validation(
432 "failed to run 'claude --version'".to_string(),
433 ));
434 }
435
436 let version_str = String::from_utf8(output.stdout)
437 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
438 let version = version_str.trim().to_string();
439 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
440
441 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
442 let parts: Vec<&str> = s.splitn(3, '.').collect();
443 if parts.len() < 2 {
444 return None;
445 }
446 let major = parts[0].parse::<u64>().ok()?;
447 let minor = parts[1].parse::<u64>().ok()?;
448 let patch = parts
449 .get(2)
450 .and_then(|p| p.parse::<u64>().ok())
451 .unwrap_or(0);
452 Some((major, minor, patch))
453 }
454
455 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
456 if actual < min {
457 return Err(AppError::Validation(format!(
458 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
459 )));
460 }
461 }
462
463 Ok(version)
464}
465
466fn call_claude(
479 binary: &Path,
480 prompt: &str,
481 json_schema: &str,
482 input_text: &str,
483 model: Option<&str>,
484 timeout_secs: u64,
485) -> Result<(serde_json::Value, f64, bool), AppError> {
486 use wait_timeout::ChildExt;
487
488 let full_prompt = format!("{prompt}\n\n{input_text}");
489
490 let mut cmd = Command::new(binary);
491
492 cmd.env_clear();
494 for var in &[
495 "PATH",
496 "HOME",
497 "USER",
498 "SHELL",
499 "TERM",
500 "LANG",
501 "XDG_CONFIG_HOME",
502 "XDG_DATA_HOME",
503 "XDG_RUNTIME_DIR",
504 "ANTHROPIC_API_KEY",
505 "CLAUDE_CONFIG_DIR",
506 "TMPDIR",
507 "TMP",
508 "TEMP",
509 "DYLD_FALLBACK_LIBRARY_PATH",
510 ] {
511 if let Ok(val) = std::env::var(var) {
512 cmd.env(var, val);
513 }
514 }
515
516 #[cfg(windows)]
517 for var in &[
518 "LOCALAPPDATA",
519 "APPDATA",
520 "USERPROFILE",
521 "SystemRoot",
522 "COMSPEC",
523 "PATHEXT",
524 "HOMEPATH",
525 "HOMEDRIVE",
526 ] {
527 if let Ok(val) = std::env::var(var) {
528 cmd.env(var, val);
529 }
530 }
531
532 cmd.arg("-p")
533 .arg(&full_prompt)
534 .arg("--output-format")
535 .arg("json")
536 .arg("--json-schema")
537 .arg(json_schema)
538 .arg("--max-turns")
539 .arg("3")
540 .arg("--no-session-persistence");
541
542 if std::env::var("ANTHROPIC_API_KEY").is_ok() {
543 cmd.arg("--bare");
544 } else {
545 cmd.arg("--dangerously-skip-permissions")
546 .arg("--settings")
547 .arg(r#"{"hooks":{}}"#);
548 }
549
550 if let Some(m) = model {
551 cmd.arg("--model").arg(m);
552 }
553
554 cmd.stdin(Stdio::null())
555 .stdout(Stdio::piped())
556 .stderr(Stdio::piped());
557
558 let mut child = cmd.spawn().map_err(|e| {
559 AppError::Io(std::io::Error::new(
560 e.kind(),
561 format!("failed to spawn claude: {e}"),
562 ))
563 })?;
564
565 let timeout = std::time::Duration::from_secs(timeout_secs);
566 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
567
568 match status {
569 Some(exit_status) => {
570 let mut stdout_buf = Vec::new();
571 let mut stderr_buf = Vec::new();
572 if let Some(mut out) = child.stdout.take() {
573 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
574 }
575 if let Some(mut err) = child.stderr.take() {
576 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
577 }
578
579 if !exit_status.success() {
580 let stderr_str = String::from_utf8_lossy(&stderr_buf);
581 if stderr_str.contains("auth") || stderr_str.contains("login") {
582 tracing::warn!(
583 target: "enrich",
584 "Claude Code authentication may have failed. Re-authenticate with: claude"
585 );
586 }
587 return Err(AppError::Validation(format!(
588 "claude -p exited with code {:?}: {}",
589 exit_status.code(),
590 stderr_str.trim()
591 )));
592 }
593
594 let stdout_str = String::from_utf8(stdout_buf)
595 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
596 parse_claude_json_output(&stdout_str)
597 }
598 None => {
599 tracing::warn!(target: "enrich", timeout_secs, "claude -p timed out, killing process");
600 let _ = child.kill();
601 let _ = child.wait();
602 Err(AppError::Validation(format!(
603 "claude -p timed out after {timeout_secs} seconds"
604 )))
605 }
606 }
607}
608
609fn parse_claude_json_output(stdout: &str) -> Result<(serde_json::Value, f64, bool), AppError> {
617 let elements: Vec<ClaudeElement> = serde_json::from_str(stdout).map_err(|e| {
618 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
619 })?;
620
621 let is_oauth = elements
622 .iter()
623 .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
624 .and_then(|e| e.api_key_source.as_deref())
625 .map(|s| s == "none")
626 .unwrap_or(false);
627
628 let result_elem = elements
629 .iter()
630 .find(|e| e.r#type.as_deref() == Some("result"))
631 .ok_or_else(|| {
632 AppError::Validation("claude output missing 'result' element".to_string())
633 })?;
634
635 if result_elem.is_error {
636 let err_msg = result_elem
637 .error
638 .as_deref()
639 .or(result_elem.result.as_deref())
640 .unwrap_or("unknown error");
641 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
642 return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
643 }
644 return Err(AppError::Validation(format!(
645 "claude extraction failed: {err_msg}"
646 )));
647 }
648
649 let value = if let Some(v) = result_elem.structured_output.clone() {
650 v
651 } else if let Some(text) = &result_elem.result {
652 serde_json::from_str(text).map_err(|e| {
653 AppError::Validation(format!("failed to parse claude result field as JSON: {e}"))
654 })?
655 } else {
656 return Err(AppError::Validation(
657 "claude result missing structured_output and result field".into(),
658 ));
659 };
660
661 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
662 Ok((value, cost, is_oauth))
663}
664
665fn scan_unbound_memories(
673 conn: &Connection,
674 namespace: &str,
675 limit: Option<usize>,
676) -> Result<Vec<(i64, String, String)>, AppError> {
677 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
678 let sql = format!(
679 "SELECT m.id, m.name, m.body
680 FROM memories m
681 WHERE m.namespace = ?1
682 AND m.deleted_at IS NULL
683 AND NOT EXISTS (
684 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
685 )
686 ORDER BY m.id
687 {limit_clause}"
688 );
689 let mut stmt = conn.prepare(&sql)?;
690 let rows = stmt
691 .query_map(rusqlite::params![namespace], |r| {
692 Ok((
693 r.get::<_, i64>(0)?,
694 r.get::<_, String>(1)?,
695 r.get::<_, String>(2)?,
696 ))
697 })?
698 .collect::<Result<Vec<_>, _>>()?;
699 Ok(rows)
700}
701
702fn scan_entities_without_description(
706 conn: &Connection,
707 namespace: &str,
708 limit: Option<usize>,
709) -> Result<Vec<(i64, String, String)>, AppError> {
710 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
711 let sql = format!(
712 "SELECT id, name, type
713 FROM entities
714 WHERE namespace = ?1
715 AND (description IS NULL OR description = '')
716 ORDER BY id
717 {limit_clause}"
718 );
719 let mut stmt = conn.prepare(&sql)?;
720 let rows = stmt
721 .query_map(rusqlite::params![namespace], |r| {
722 Ok((
723 r.get::<_, i64>(0)?,
724 r.get::<_, String>(1)?,
725 r.get::<_, String>(2)?,
726 ))
727 })?
728 .collect::<Result<Vec<_>, _>>()?;
729 Ok(rows)
730}
731
732fn scan_short_body_memories(
736 conn: &Connection,
737 namespace: &str,
738 min_chars: usize,
739 limit: Option<usize>,
740) -> Result<Vec<(i64, String, String)>, AppError> {
741 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
742 let sql = format!(
743 "SELECT m.id, m.name, m.body
744 FROM memories m
745 WHERE m.namespace = ?1
746 AND m.deleted_at IS NULL
747 AND LENGTH(COALESCE(m.body,'')) < ?2
748 ORDER BY m.id
749 {limit_clause}"
750 );
751 let mut stmt = conn.prepare(&sql)?;
752 let rows = stmt
753 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
754 Ok((
755 r.get::<_, i64>(0)?,
756 r.get::<_, String>(1)?,
757 r.get::<_, String>(2)?,
758 ))
759 })?
760 .collect::<Result<Vec<_>, _>>()?;
761 Ok(rows)
762}
763
764fn persist_memory_bindings(
773 conn: &Connection,
774 namespace: &str,
775 memory_id: i64,
776 entities_json: &serde_json::Value,
777 rels_json: &serde_json::Value,
778) -> Result<(usize, usize), AppError> {
779 #[derive(Deserialize)]
780 struct EntityItem {
781 name: String,
782 entity_type: String,
783 }
784 #[derive(Deserialize)]
785 struct RelItem {
786 source: String,
787 target: String,
788 relation: String,
789 strength: f64,
790 }
791
792 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
793 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
794
795 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
796 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
797
798 let mut ent_count = 0usize;
799 let mut rel_count = 0usize;
800
801 for item in &extracted_entities {
802 let entity_type = match item.entity_type.parse::<EntityType>() {
803 Ok(et) => et,
804 Err(_) => {
805 tracing::warn!(
806 target: "enrich",
807 entity = %item.name,
808 entity_type = %item.entity_type,
809 "entity type not recognized, skipping"
810 );
811 continue;
812 }
813 };
814 match entities::upsert_entity(
815 conn,
816 namespace,
817 &NewEntity {
818 name: item.name.clone(),
819 entity_type,
820 description: None,
821 },
822 ) {
823 Ok(eid) => {
824 let _ = entities::link_memory_entity(conn, memory_id, eid);
825 ent_count += 1;
826 }
827 Err(e) => {
828 tracing::warn!(
829 target: "enrich",
830 entity = %item.name,
831 error = %e,
832 "entity upsert skipped"
833 );
834 }
835 }
836 }
837
838 for rel in &extracted_rels {
839 let normalized = crate::parsers::normalize_relation(&rel.relation);
840 crate::parsers::warn_if_non_canonical(&normalized);
841
842 let src_name = crate::parsers::normalize_entity_name(&rel.source);
845 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
846 let src_id = entities::find_entity_id(conn, namespace, &src_name);
847 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
848 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
849 let new_rel = NewRelationship {
850 source: rel.source.clone(),
851 target: rel.target.clone(),
852 relation: normalized,
853 strength: rel.strength,
854 description: None,
855 };
856 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
857 rel_count += 1;
858 }
859 }
860 }
861
862 Ok((ent_count, rel_count))
863}
864
865fn persist_entity_description(
867 conn: &Connection,
868 entity_id: i64,
869 description: &str,
870) -> Result<(), AppError> {
871 conn.execute(
872 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
873 rusqlite::params![description, entity_id],
874 )?;
875 Ok(())
876}
877
878fn persist_enriched_body(
883 conn: &Connection,
884 namespace: &str,
885 memory_id: i64,
886 memory_name: &str,
887 new_body: &str,
888 paths: &crate::paths::AppPaths,
889) -> Result<(), AppError> {
890 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
892 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
893 rusqlite::params![memory_id],
894 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
895 )?;
896
897 let memory_type: String = conn.query_row(
898 "SELECT type FROM memories WHERE id=?1",
899 rusqlite::params![memory_id],
900 |r| r.get(0),
901 )?;
902
903 let description: String = conn.query_row(
904 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
905 rusqlite::params![memory_id],
906 |r| r.get(0),
907 )?;
908
909 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
910
911 let new_memory = memories::NewMemory {
912 namespace: namespace.to_string(),
913 name: memory_name.to_string(),
914 memory_type: memory_type.clone(),
915 description: description.clone(),
916 body: new_body.to_string(),
917 body_hash,
918 session_id: None,
919 source: "enrich".to_string(),
920 metadata: serde_json::Value::Object(serde_json::Map::new()),
921 };
922
923 memories::update(conn, memory_id, &new_memory, None)?;
924 memories::sync_fts_after_update(
925 conn,
926 memory_id,
927 &old_name,
928 &old_desc,
929 &old_body,
930 &new_memory.name,
931 &new_memory.description,
932 &new_memory.body,
933 )?;
934
935 let snippet: String = new_body.chars().take(200).collect();
937 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
938 let chunks_info = crate::chunking::split_into_chunks_hierarchical(new_body, tokenizer);
939 let embedding_result = if chunks_info.len() <= 1 {
940 crate::daemon::embed_passage_or_local(&paths.models, new_body)
941 } else {
942 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
943 let mut ok = true;
944 for chunk in &chunks_info {
945 let text = crate::chunking::chunk_text(new_body, chunk);
946 match crate::daemon::embed_passage_or_local(&paths.models, text) {
947 Ok(emb) => chunk_embeddings.push(emb),
948 Err(e) => {
949 tracing::warn!(target: "enrich", error = %e, "chunk embedding failed");
950 ok = false;
951 break;
952 }
953 }
954 }
955 if ok {
956 Ok(crate::chunking::aggregate_embeddings(&chunk_embeddings))
957 } else {
958 crate::daemon::embed_passage_or_local(&paths.models, new_body)
959 }
960 };
961
962 if let Ok(embedding) = embedding_result {
963 if let Err(e) = memories::upsert_vec(
964 conn,
965 memory_id,
966 namespace,
967 &memory_type,
968 &embedding,
969 memory_name,
970 &snippet,
971 ) {
972 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
973 }
974 }
975
976 Ok(())
977}
978
979pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
985 let started = Instant::now();
986
987 let paths = AppPaths::resolve(args.db.as_deref())?;
988 ensure_db_ready(&paths)?;
989 let conn = open_rw(&paths.db)?;
990 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
991
992 let provider_binary = match args.mode {
994 EnrichMode::ClaudeCode => {
995 let bin = find_claude_binary(args.claude_binary.as_deref())?;
996 let version = validate_claude_version_local(&bin)?;
997 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
998 emit_json(&PhaseEvent {
999 phase: "validate",
1000 binary_path: bin.to_str(),
1001 version: Some(&version),
1002 items_total: None,
1003 items_pending: None,
1004 });
1005 bin
1006 }
1007 EnrichMode::Codex => {
1008 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1010 emit_json(&PhaseEvent {
1011 phase: "validate",
1012 binary_path: bin.to_str(),
1013 version: None,
1014 items_total: None,
1015 items_pending: None,
1016 });
1017 bin
1018 }
1019 };
1020
1021 let scan_result = scan_operation(&conn, &namespace, args)?;
1023 let total = scan_result.len();
1024
1025 emit_json(&PhaseEvent {
1026 phase: "scan",
1027 binary_path: None,
1028 version: None,
1029 items_total: Some(total),
1030 items_pending: Some(total),
1031 });
1032
1033 if args.dry_run {
1035 for (idx, key) in scan_result.iter().enumerate() {
1036 emit_json(&ItemEvent {
1037 item: key,
1038 status: "preview",
1039 memory_id: None,
1040 entity_id: None,
1041 entities: None,
1042 rels: None,
1043 chars_before: None,
1044 chars_after: None,
1045 cost_usd: None,
1046 elapsed_ms: None,
1047 error: None,
1048 index: idx,
1049 total,
1050 });
1051 }
1052 emit_json(&EnrichSummary {
1053 summary: true,
1054 operation: format!("{:?}", args.operation),
1055 items_total: total,
1056 completed: 0,
1057 failed: 0,
1058 skipped: 0,
1059 cost_usd: 0.0,
1060 elapsed_ms: started.elapsed().as_millis() as u64,
1061 });
1062 return Ok(());
1063 }
1064
1065 match args.operation {
1068 EnrichOperation::MemoryBindings
1069 | EnrichOperation::EntityDescriptions
1070 | EnrichOperation::BodyEnrich => {
1071 }
1073 _ => {
1074 for (idx, key) in scan_result.iter().enumerate() {
1075 emit_json(&serde_json::json!({
1076 "item": key,
1077 "status": "not_yet_implemented",
1078 "operation": format!("{:?}", args.operation),
1079 "index": idx,
1080 "total": total
1081 }));
1082 }
1083 emit_json(&EnrichSummary {
1084 summary: true,
1085 operation: format!("{:?}", args.operation),
1086 items_total: total,
1087 completed: 0,
1088 failed: 0,
1089 skipped: total,
1090 cost_usd: 0.0,
1091 elapsed_ms: started.elapsed().as_millis() as u64,
1092 });
1093 return Ok(());
1094 }
1095 }
1096
1097 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1099
1100 if args.resume {
1101 let reset = queue_conn
1102 .execute(
1103 "UPDATE queue SET status='pending' WHERE status='processing'",
1104 [],
1105 )
1106 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1107 if reset > 0 {
1108 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1109 }
1110 }
1111
1112 if args.retry_failed {
1113 let count = queue_conn
1114 .execute(
1115 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1116 [],
1117 )
1118 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1119 tracing::info!(target: "enrich", count, "retrying failed items");
1120 }
1121
1122 if !args.resume && !args.retry_failed {
1123 queue_conn
1124 .execute("DELETE FROM queue", [])
1125 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1126 }
1127
1128 for (idx, key) in scan_result.iter().enumerate() {
1130 let item_type = match args.operation {
1131 EnrichOperation::EntityDescriptions => "entity",
1132 _ => "memory",
1133 };
1134 let _ = queue_conn.execute(
1135 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1136 rusqlite::params![key, item_type],
1137 );
1138 let _ = idx; }
1140
1141 let mut completed = 0usize;
1142 let mut failed = 0usize;
1143 let mut skipped = 0usize;
1144 let mut cost_total = 0.0f64;
1145 let mut oauth_detected = false;
1146 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1147
1148 let provider_timeout = match args.mode {
1149 EnrichMode::ClaudeCode => args.claude_timeout,
1150 EnrichMode::Codex => args.codex_timeout,
1151 };
1152
1153 let provider_model: Option<&str> = match args.mode {
1154 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1155 EnrichMode::Codex => args.codex_model.as_deref(),
1156 };
1157
1158 loop {
1159 if let Some(budget) = args.max_cost_usd {
1161 if !oauth_detected && cost_total >= budget {
1162 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1163 break;
1164 }
1165 }
1166
1167 let pending: Option<(i64, String, String)> = queue_conn
1169 .query_row(
1170 "UPDATE queue SET status='processing', attempt=attempt+1 \
1171 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1172 RETURNING id, item_key, item_type",
1173 [],
1174 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1175 )
1176 .ok();
1177
1178 let (queue_id, item_key, item_type) = match pending {
1179 Some(p) => p,
1180 None => break,
1181 };
1182
1183 let item_started = Instant::now();
1184 let current_index = completed + failed + skipped;
1185
1186 let call_result = match args.operation {
1187 EnrichOperation::MemoryBindings => call_memory_bindings(
1188 &conn,
1189 &namespace,
1190 &item_key,
1191 &provider_binary,
1192 provider_model,
1193 provider_timeout,
1194 &args.mode,
1195 ),
1196 EnrichOperation::EntityDescriptions => call_entity_description(
1197 &conn,
1198 &namespace,
1199 &item_key,
1200 &provider_binary,
1201 provider_model,
1202 provider_timeout,
1203 &args.mode,
1204 ),
1205 EnrichOperation::BodyEnrich => call_body_enrich(
1206 &conn,
1207 &namespace,
1208 &item_key,
1209 &provider_binary,
1210 provider_model,
1211 provider_timeout,
1212 &args.mode,
1213 args.min_output_chars,
1214 args.max_output_chars,
1215 args.prompt_template.as_deref(),
1216 &paths,
1217 ),
1218 _ => unreachable!("non-implemented ops handled above"),
1219 };
1220
1221 match call_result {
1222 Ok(EnrichItemResult::Done {
1223 memory_id,
1224 entity_id,
1225 entities,
1226 rels,
1227 chars_before,
1228 chars_after,
1229 cost,
1230 is_oauth,
1231 }) => {
1232 if is_oauth && !oauth_detected {
1233 oauth_detected = true;
1234 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
1235 }
1236 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1237
1238 let persist_err: Option<String> = match args.operation {
1240 EnrichOperation::MemoryBindings => {
1241 None
1243 }
1244 EnrichOperation::EntityDescriptions => {
1245 None
1247 }
1248 EnrichOperation::BodyEnrich => {
1249 None
1251 }
1252 _ => unreachable!(),
1253 };
1254
1255 let _ = queue_conn.execute(
1256 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1257 rusqlite::params![
1258 memory_id,
1259 entity_id,
1260 entities as i64,
1261 rels as i64,
1262 cost,
1263 item_started.elapsed().as_millis() as i64,
1264 queue_id
1265 ],
1266 );
1267
1268 if persist_err.is_none() {
1269 completed += 1;
1270 if !is_oauth {
1271 cost_total += cost;
1272 }
1273 emit_json(&ItemEvent {
1274 item: &item_key,
1275 status: "done",
1276 memory_id,
1277 entity_id,
1278 entities: Some(entities),
1279 rels: Some(rels),
1280 chars_before,
1281 chars_after,
1282 cost_usd: if is_oauth { None } else { Some(cost) },
1283 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1284 error: None,
1285 index: current_index,
1286 total,
1287 });
1288 } else {
1289 failed += 1;
1290 emit_json(&ItemEvent {
1291 item: &item_key,
1292 status: "failed",
1293 memory_id: None,
1294 entity_id: None,
1295 entities: None,
1296 rels: None,
1297 chars_before: None,
1298 chars_after: None,
1299 cost_usd: None,
1300 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1301 error: persist_err,
1302 index: current_index,
1303 total,
1304 });
1305 }
1306 }
1307 Ok(EnrichItemResult::Skipped { reason }) => {
1308 skipped += 1;
1309 let _ = queue_conn.execute(
1310 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1311 rusqlite::params![reason, queue_id],
1312 );
1313 emit_json(&ItemEvent {
1314 item: &item_key,
1315 status: "skipped",
1316 memory_id: None,
1317 entity_id: None,
1318 entities: None,
1319 rels: None,
1320 chars_before: None,
1321 chars_after: None,
1322 cost_usd: None,
1323 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1324 error: None,
1325 index: current_index,
1326 total,
1327 });
1328 }
1329 Err(e) => {
1330 let err_str = format!("{e}");
1331 if err_str.contains("RATE_LIMITED") {
1332 tracing::warn!(target: "enrich", wait_seconds = backoff_secs, "rate limited, waiting before retry");
1333 let _ = queue_conn.execute(
1334 "UPDATE queue SET status='pending' WHERE id=?1",
1335 rusqlite::params![queue_id],
1336 );
1337 std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1338 backoff_secs = (backoff_secs * 2).min(900);
1339 continue;
1340 }
1341
1342 failed += 1;
1343 let _ = queue_conn.execute(
1344 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1345 rusqlite::params![err_str, queue_id],
1346 );
1347 emit_json(&ItemEvent {
1348 item: &item_key,
1349 status: "failed",
1350 memory_id: None,
1351 entity_id: None,
1352 entities: None,
1353 rels: None,
1354 chars_before: None,
1355 chars_after: None,
1356 cost_usd: None,
1357 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1358 error: Some(err_str),
1359 index: current_index,
1360 total,
1361 });
1362 }
1363 }
1364
1365 let _ = item_type; }
1367
1368 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1369 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1370
1371 emit_json(&EnrichSummary {
1372 summary: true,
1373 operation: format!("{:?}", args.operation),
1374 items_total: total,
1375 completed,
1376 failed,
1377 skipped,
1378 cost_usd: cost_total,
1379 elapsed_ms: started.elapsed().as_millis() as u64,
1380 });
1381
1382 if failed == 0 {
1383 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
1384 }
1385
1386 Ok(())
1387}
1388
1389enum EnrichItemResult {
1394 Done {
1395 memory_id: Option<i64>,
1396 entity_id: Option<i64>,
1397 entities: usize,
1398 rels: usize,
1399 chars_before: Option<usize>,
1400 chars_after: Option<usize>,
1401 cost: f64,
1402 is_oauth: bool,
1403 },
1404 Skipped {
1405 reason: String,
1406 },
1407}
1408
1409fn call_memory_bindings(
1414 conn: &Connection,
1415 namespace: &str,
1416 memory_name: &str,
1417 binary: &Path,
1418 model: Option<&str>,
1419 timeout: u64,
1420 mode: &EnrichMode,
1421) -> Result<EnrichItemResult, AppError> {
1422 let (memory_id, body): (i64, String) = conn.query_row(
1424 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1425 rusqlite::params![namespace, memory_name],
1426 |r| Ok((r.get(0)?, r.get(1)?)),
1427 ).map_err(|e| match e {
1428 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
1429 other => AppError::Database(other),
1430 })?;
1431
1432 if body.trim().is_empty() {
1433 return Ok(EnrichItemResult::Skipped {
1434 reason: "body is empty".to_string(),
1435 });
1436 }
1437
1438 let (value, cost, is_oauth) = match mode {
1439 EnrichMode::ClaudeCode => call_claude(
1440 binary,
1441 BINDINGS_PROMPT,
1442 BINDINGS_SCHEMA,
1443 &body,
1444 model,
1445 timeout,
1446 )?,
1447 EnrichMode::Codex => call_codex(
1448 binary,
1449 BINDINGS_PROMPT,
1450 BINDINGS_SCHEMA,
1451 &body,
1452 model,
1453 timeout,
1454 )?,
1455 };
1456
1457 let empty_arr = serde_json::Value::Array(vec![]);
1458 let entities_val = value.get("entities").unwrap_or(&empty_arr);
1459 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
1460
1461 let (ent_count, rel_count) =
1462 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
1463
1464 Ok(EnrichItemResult::Done {
1465 memory_id: Some(memory_id),
1466 entity_id: None,
1467 entities: ent_count,
1468 rels: rel_count,
1469 chars_before: None,
1470 chars_after: None,
1471 cost,
1472 is_oauth,
1473 })
1474}
1475
1476fn call_entity_description(
1477 conn: &Connection,
1478 namespace: &str,
1479 entity_name: &str,
1480 binary: &Path,
1481 model: Option<&str>,
1482 timeout: u64,
1483 mode: &EnrichMode,
1484) -> Result<EnrichItemResult, AppError> {
1485 let (entity_id, entity_type): (i64, String) = conn
1486 .query_row(
1487 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
1488 rusqlite::params![namespace, entity_name],
1489 |r| Ok((r.get(0)?, r.get(1)?)),
1490 )
1491 .map_err(|e| match e {
1492 rusqlite::Error::QueryReturnedNoRows => {
1493 AppError::NotFound(format!("entity '{entity_name}' not found"))
1494 }
1495 other => AppError::Database(other),
1496 })?;
1497
1498 let prompt = format!(
1499 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
1500 );
1501
1502 let (value, cost, is_oauth) = match mode {
1503 EnrichMode::ClaudeCode => call_claude(
1504 binary,
1505 &prompt,
1506 ENTITY_DESCRIPTION_SCHEMA,
1507 "",
1508 model,
1509 timeout,
1510 )?,
1511 EnrichMode::Codex => call_codex(
1512 binary,
1513 &prompt,
1514 ENTITY_DESCRIPTION_SCHEMA,
1515 "",
1516 model,
1517 timeout,
1518 )?,
1519 };
1520
1521 let description = value
1522 .get("description")
1523 .and_then(|v| v.as_str())
1524 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
1525
1526 persist_entity_description(conn, entity_id, description)?;
1527
1528 Ok(EnrichItemResult::Done {
1529 memory_id: None,
1530 entity_id: Some(entity_id),
1531 entities: 0,
1532 rels: 0,
1533 chars_before: None,
1534 chars_after: None,
1535 cost,
1536 is_oauth,
1537 })
1538}
1539
1540#[allow(clippy::too_many_arguments)]
1541fn call_body_enrich(
1542 conn: &Connection,
1543 namespace: &str,
1544 memory_name: &str,
1545 binary: &Path,
1546 model: Option<&str>,
1547 timeout: u64,
1548 mode: &EnrichMode,
1549 min_output_chars: usize,
1550 max_output_chars: usize,
1551 prompt_template: Option<&Path>,
1552 paths: &crate::paths::AppPaths,
1553) -> Result<EnrichItemResult, AppError> {
1554 let (memory_id, body): (i64, String) = conn.query_row(
1555 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1556 rusqlite::params![namespace, memory_name],
1557 |r| Ok((r.get(0)?, r.get(1)?)),
1558 ).map_err(|e| match e {
1559 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
1560 other => AppError::Database(other),
1561 })?;
1562
1563 let chars_before = body.chars().count();
1564
1565 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
1567 std::fs::read_to_string(tmpl_path).map_err(|e| {
1568 AppError::Io(std::io::Error::new(
1569 e.kind(),
1570 format!("failed to read prompt template: {e}"),
1571 ))
1572 })?
1573 } else {
1574 BODY_ENRICH_PROMPT_PREFIX.to_string()
1575 };
1576
1577 let prompt = format!(
1578 "{prompt_prefix}Target minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
1579 );
1580
1581 let (value, cost, is_oauth) = match mode {
1583 EnrichMode::ClaudeCode => {
1584 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1585 }
1586 EnrichMode::Codex => {
1587 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1588 }
1589 };
1590
1591 let enriched_body = value
1592 .get("enriched_body")
1593 .and_then(|v| v.as_str())
1594 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
1595
1596 let chars_after = enriched_body.chars().count();
1597
1598 if chars_after <= chars_before {
1600 return Ok(EnrichItemResult::Skipped {
1601 reason: format!(
1602 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
1603 ),
1604 });
1605 }
1606
1607 persist_enriched_body(
1608 conn,
1609 namespace,
1610 memory_id,
1611 memory_name,
1612 enriched_body,
1613 paths,
1614 )?;
1615
1616 Ok(EnrichItemResult::Done {
1617 memory_id: Some(memory_id),
1618 entity_id: None,
1619 entities: 0,
1620 rels: 0,
1621 chars_before: Some(chars_before),
1622 chars_after: Some(chars_after),
1623 cost,
1624 is_oauth,
1625 })
1626}
1627
1628fn scan_operation(
1633 conn: &Connection,
1634 namespace: &str,
1635 args: &EnrichArgs,
1636) -> Result<Vec<String>, AppError> {
1637 match args.operation {
1638 EnrichOperation::MemoryBindings => {
1639 let rows = scan_unbound_memories(conn, namespace, args.limit)?;
1640 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1641 }
1642 EnrichOperation::EntityDescriptions => {
1643 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
1644 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1645 }
1646 EnrichOperation::BodyEnrich => {
1647 let rows =
1648 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
1649 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1650 }
1651 EnrichOperation::WeightCalibrate
1653 | EnrichOperation::RelationReclassify
1654 | EnrichOperation::EntityConnect
1655 | EnrichOperation::EntityTypeValidate
1656 | EnrichOperation::DescriptionEnrich
1657 | EnrichOperation::CrossDomainBridges
1658 | EnrichOperation::DomainClassify
1659 | EnrichOperation::GraphAudit
1660 | EnrichOperation::DeepResearchSynth
1661 | EnrichOperation::BodyExtract => {
1662 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1663 let sql = format!(
1664 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
1665 );
1666 let mut stmt = conn.prepare(&sql)?;
1667 let names = stmt
1668 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
1669 .collect::<Result<Vec<_>, _>>()?;
1670 Ok(names)
1671 }
1672 }
1673}
1674
1675fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
1681 if let Some(p) = explicit {
1682 if p.exists() {
1683 return Ok(p.to_path_buf());
1684 }
1685 return Err(AppError::Validation(format!(
1686 "Codex binary not found at explicit path: {}",
1687 p.display()
1688 )));
1689 }
1690
1691 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
1692 let p = PathBuf::from(&env_path);
1693 if p.exists() {
1694 return Ok(p);
1695 }
1696 }
1697
1698 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
1699 if let Some(path_var) = std::env::var_os("PATH") {
1700 for dir in std::env::split_paths(&path_var) {
1701 let candidate = dir.join(name);
1702 if candidate.exists() {
1703 return Ok(candidate);
1704 }
1705 }
1706 }
1707
1708 Err(AppError::Validation(
1709 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
1710 ))
1711}
1712
1713fn call_codex(
1717 binary: &Path,
1718 prompt: &str,
1719 json_schema: &str,
1720 input_text: &str,
1721 model: Option<&str>,
1722 timeout_secs: u64,
1723) -> Result<(serde_json::Value, f64, bool), AppError> {
1724 use wait_timeout::ChildExt;
1725
1726 let full_prompt = format!("{prompt}\n\n{input_text}");
1727 let schema_file = {
1728 let tmp = std::env::temp_dir().join(format!("enrich-schema-{}.json", std::process::id()));
1729 std::fs::write(&tmp, json_schema).map_err(AppError::Io)?;
1730 tmp
1731 };
1732
1733 let mut cmd = Command::new(binary);
1734 cmd.env_clear();
1735 for var in &[
1736 "PATH",
1737 "HOME",
1738 "USER",
1739 "OPENAI_API_KEY",
1740 "TMPDIR",
1741 "TMP",
1742 "TEMP",
1743 ] {
1744 if let Ok(val) = std::env::var(var) {
1745 cmd.env(var, val);
1746 }
1747 }
1748
1749 cmd.arg("exec")
1750 .arg("--json")
1751 .arg("--output-schema")
1752 .arg(&schema_file);
1753
1754 if let Some(m) = model {
1755 cmd.arg("--model").arg(m);
1756 }
1757
1758 cmd.stdin(Stdio::piped())
1759 .stdout(Stdio::piped())
1760 .stderr(Stdio::piped());
1761
1762 let mut child = cmd.spawn().map_err(|e| {
1763 AppError::Io(std::io::Error::new(
1764 e.kind(),
1765 format!("failed to spawn codex: {e}"),
1766 ))
1767 })?;
1768
1769 if let Some(mut stdin) = child.stdin.take() {
1771 let _ = stdin.write_all(full_prompt.as_bytes());
1772 }
1773
1774 let timeout = std::time::Duration::from_secs(timeout_secs);
1775 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
1776
1777 let _ = std::fs::remove_file(&schema_file);
1778
1779 match status {
1780 Some(exit_status) => {
1781 let mut stdout_buf = Vec::new();
1782 if let Some(mut out) = child.stdout.take() {
1783 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
1784 }
1785 if !exit_status.success() {
1786 let mut stderr_buf = Vec::new();
1787 if let Some(mut err) = child.stderr.take() {
1788 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
1789 }
1790 return Err(AppError::Validation(format!(
1791 "codex exited with code {:?}: {}",
1792 exit_status.code(),
1793 String::from_utf8_lossy(&stderr_buf).trim()
1794 )));
1795 }
1796 let stdout_str = String::from_utf8(stdout_buf)
1797 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
1798 let value: serde_json::Value = serde_json::from_str(&stdout_str).map_err(|e| {
1799 AppError::Validation(format!("failed to parse codex output as JSON: {e}"))
1800 })?;
1801 Ok((value, 0.0, false))
1802 }
1803 None => {
1804 let _ = child.kill();
1805 let _ = child.wait();
1806 Err(AppError::Validation(format!(
1807 "codex timed out after {timeout_secs} seconds"
1808 )))
1809 }
1810 }
1811}
1812
1813#[cfg(test)]
1818mod tests {
1819 use super::*;
1820 use rusqlite::Connection;
1821
1822 fn open_test_db() -> Connection {
1824 let conn = Connection::open_in_memory().expect("in-memory db");
1825 conn.execute_batch(
1826 "CREATE TABLE memories (
1827 id INTEGER PRIMARY KEY AUTOINCREMENT,
1828 namespace TEXT NOT NULL DEFAULT 'global',
1829 name TEXT NOT NULL,
1830 type TEXT NOT NULL DEFAULT 'note',
1831 description TEXT NOT NULL DEFAULT '',
1832 body TEXT NOT NULL DEFAULT '',
1833 body_hash TEXT NOT NULL DEFAULT '',
1834 session_id TEXT,
1835 source TEXT NOT NULL DEFAULT 'agent',
1836 metadata TEXT NOT NULL DEFAULT '{}',
1837 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1838 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1839 deleted_at INTEGER,
1840 UNIQUE(namespace, name)
1841 );
1842 CREATE TABLE entities (
1843 id INTEGER PRIMARY KEY AUTOINCREMENT,
1844 namespace TEXT NOT NULL DEFAULT 'global',
1845 name TEXT NOT NULL,
1846 type TEXT NOT NULL DEFAULT 'concept',
1847 description TEXT,
1848 degree INTEGER NOT NULL DEFAULT 0,
1849 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
1850 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
1851 UNIQUE(namespace, name)
1852 );
1853 CREATE TABLE memory_entities (
1854 memory_id INTEGER NOT NULL,
1855 entity_id INTEGER NOT NULL,
1856 PRIMARY KEY (memory_id, entity_id)
1857 );
1858 CREATE TABLE relationships (
1859 id INTEGER PRIMARY KEY AUTOINCREMENT,
1860 namespace TEXT NOT NULL DEFAULT 'global',
1861 source_id INTEGER NOT NULL,
1862 target_id INTEGER NOT NULL,
1863 relation TEXT NOT NULL,
1864 weight REAL NOT NULL DEFAULT 0.5,
1865 description TEXT,
1866 UNIQUE(source_id, target_id, relation)
1867 );",
1868 )
1869 .expect("schema creation must succeed");
1870 conn
1871 }
1872
1873 #[test]
1874 fn scan_unbound_memories_finds_memories_without_bindings() {
1875 let conn = open_test_db();
1876 conn.execute(
1877 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
1878 [],
1879 )
1880 .unwrap();
1881
1882 let results = scan_unbound_memories(&conn, "global", None).unwrap();
1883 assert_eq!(results.len(), 1);
1884 assert_eq!(results[0].1, "test-mem");
1885 }
1886
1887 #[test]
1888 fn scan_unbound_memories_excludes_bound_memories() {
1889 let conn = open_test_db();
1890 conn.execute(
1891 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
1892 [],
1893 )
1894 .unwrap();
1895 let mem_id: i64 = conn
1896 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
1897 r.get(0)
1898 })
1899 .unwrap();
1900 conn.execute(
1901 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
1902 [],
1903 )
1904 .unwrap();
1905 let ent_id: i64 = conn
1906 .query_row(
1907 "SELECT id FROM entities WHERE name='some-entity'",
1908 [],
1909 |r| r.get(0),
1910 )
1911 .unwrap();
1912 conn.execute(
1913 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
1914 rusqlite::params![mem_id, ent_id],
1915 )
1916 .unwrap();
1917
1918 let results = scan_unbound_memories(&conn, "global", None).unwrap();
1919 assert!(results.is_empty(), "bound memory must not appear in scan");
1920 }
1921
1922 #[test]
1923 fn scan_entities_without_description_finds_null_description() {
1924 let conn = open_test_db();
1925 conn.execute(
1926 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
1927 [],
1928 )
1929 .unwrap();
1930
1931 let results = scan_entities_without_description(&conn, "global", None).unwrap();
1932 assert_eq!(results.len(), 1);
1933 assert_eq!(results[0].1, "my-tool");
1934 }
1935
1936 #[test]
1937 fn scan_entities_without_description_excludes_entities_with_description() {
1938 let conn = open_test_db();
1939 conn.execute(
1940 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
1941 [],
1942 )
1943 .unwrap();
1944
1945 let results = scan_entities_without_description(&conn, "global", None).unwrap();
1946 assert!(
1947 results.is_empty(),
1948 "entity with description must not appear"
1949 );
1950 }
1951
1952 #[test]
1953 fn scan_short_body_memories_finds_short_bodies() {
1954 let conn = open_test_db();
1955 conn.execute(
1956 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
1957 [],
1958 )
1959 .unwrap();
1960
1961 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
1962 assert_eq!(results.len(), 1);
1963 assert_eq!(results[0].1, "short-mem");
1964 }
1965
1966 #[test]
1967 fn scan_short_body_memories_excludes_long_bodies() {
1968 let conn = open_test_db();
1969 let long_body = "a".repeat(1000);
1970 conn.execute(
1971 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
1972 rusqlite::params![long_body],
1973 )
1974 .unwrap();
1975
1976 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
1977 assert!(results.is_empty(), "long memory must not appear in scan");
1978 }
1979
1980 #[test]
1981 fn scan_respects_limit() {
1982 let conn = open_test_db();
1983 for i in 0..5 {
1984 conn.execute(
1985 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
1986 [],
1987 )
1988 .unwrap();
1989 }
1990
1991 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
1992 assert_eq!(results.len(), 3, "limit must be respected");
1993 }
1994
1995 #[test]
1996 fn queue_db_schema_creates_correctly() {
1997 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
1998 let conn = open_queue_db(&tmp_path).expect("queue db must open");
1999 let count: i64 = conn
2000 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
2001 .unwrap();
2002 assert_eq!(count, 0);
2003 let _ = std::fs::remove_file(&tmp_path);
2004 }
2005
2006 #[test]
2007 fn parse_claude_json_output_valid_bindings() {
2008 let output = r#"[
2009 {"type":"system","subtype":"init"},
2010 {"type":"result","is_error":false,"total_cost_usd":0.01,
2011 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
2012 ]"#;
2013 let (value, cost, is_oauth) =
2014 parse_claude_json_output(output).expect("must parse successfully");
2015 assert!(value.get("entities").is_some());
2016 assert!((cost - 0.01).abs() < f64::EPSILON);
2017 assert!(!is_oauth);
2018 }
2019
2020 #[test]
2021 fn parse_claude_json_output_detects_oauth() {
2022 let output = r#"[
2023 {"type":"system","subtype":"init","apiKeySource":"none"},
2024 {"type":"result","is_error":false,"total_cost_usd":0.0,
2025 "structured_output":{"entities":[],"relationships":[]}}
2026 ]"#;
2027 let (_value, _cost, is_oauth) = parse_claude_json_output(output).unwrap();
2028 assert!(is_oauth);
2029 }
2030
2031 #[test]
2032 fn parse_claude_json_output_rate_limit_returns_error() {
2033 let output = r#"[
2034 {"type":"system","subtype":"init"},
2035 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
2036 ]"#;
2037 let err = parse_claude_json_output(output).unwrap_err();
2038 assert!(format!("{err}").contains("RATE_LIMITED"));
2039 }
2040
2041 #[test]
2042 fn parse_claude_json_output_auth_error() {
2043 let output = r#"[
2044 {"type":"system","subtype":"init"},
2045 {"type":"result","is_error":true,"error":"authentication failed"}
2046 ]"#;
2047 let err = parse_claude_json_output(output).unwrap_err();
2048 assert!(format!("{err}").contains("authentication failed"));
2049 }
2050
2051 #[test]
2052 fn dry_run_emits_preview_without_calling_llm() {
2053 let conn = open_test_db();
2058 conn.execute(
2059 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
2060 [],
2061 )
2062 .unwrap();
2063
2064 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
2065 assert_eq!(results.len(), 1);
2066 assert_eq!(results[0].1, "dry-mem");
2067 }
2070
2071 #[test]
2072 fn persist_entity_description_updates_db() {
2073 let conn = open_test_db();
2074 conn.execute(
2075 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
2076 [],
2077 )
2078 .unwrap();
2079 let eid: i64 = conn
2080 .query_row(
2081 "SELECT id FROM entities WHERE name='tokio-runtime'",
2082 [],
2083 |r| r.get(0),
2084 )
2085 .unwrap();
2086
2087 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
2088
2089 let desc: String = conn
2090 .query_row(
2091 "SELECT description FROM entities WHERE id=?1",
2092 rusqlite::params![eid],
2093 |r| r.get(0),
2094 )
2095 .unwrap();
2096 assert_eq!(desc, "Async runtime for Rust applications");
2097 }
2098
2099 #[test]
2100 fn bindings_schema_is_valid_json() {
2101 let _: serde_json::Value =
2102 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
2103 }
2104
2105 #[test]
2106 fn entity_description_schema_is_valid_json() {
2107 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
2108 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
2109 }
2110
2111 #[test]
2112 fn body_enrich_schema_is_valid_json() {
2113 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
2114 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
2115 }
2116}