1use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::process::{Command, Stdio};
38use std::time::Instant;
39
40const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
45const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
46const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
47const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
48
49const BINDINGS_SCHEMA: &str = r#"{
54 "type": "object",
55 "properties": {
56 "entities": {
57 "type": "array",
58 "items": {
59 "type": "object",
60 "properties": {
61 "name": { "type": "string" },
62 "entity_type": {
63 "type": "string",
64 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
65 }
66 },
67 "required": ["name", "entity_type"],
68 "additionalProperties": false
69 }
70 },
71 "relationships": {
72 "type": "array",
73 "items": {
74 "type": "object",
75 "properties": {
76 "source": { "type": "string" },
77 "target": { "type": "string" },
78 "relation": {
79 "type": "string",
80 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
81 },
82 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
83 },
84 "required": ["source","target","relation","strength"],
85 "additionalProperties": false
86 }
87 }
88 },
89 "required": ["entities","relationships"],
90 "additionalProperties": false
91}"#;
92
93const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
94 "type": "object",
95 "properties": {
96 "description": { "type": "string" }
97 },
98 "required": ["description"],
99 "additionalProperties": false
100}"#;
101
102const BODY_ENRICH_SCHEMA: &str = r#"{
103 "type": "object",
104 "properties": {
105 "enriched_body": { "type": "string" }
106 },
107 "required": ["enriched_body"],
108 "additionalProperties": false
109}"#;
110
111const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
113Scale:\n\
114- 0.9 = vital hard dependency (A cannot function without B)\n\
115- 0.7 = important design relationship (A strongly supports/enables B)\n\
116- 0.5 = useful contextual link (A and B share relevant context)\n\
117- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
118Respond with the calibrated weight and brief reasoning.";
119
120const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
121 "type": "object",
122 "properties": {
123 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
124 "reasoning": { "type": "string" }
125 },
126 "required": ["calibrated_weight", "reasoning"],
127 "additionalProperties": false
128}"#;
129
130const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
132Valid canonical relations (pick exactly one):\n\
133- depends-on: A cannot function without B\n\
134- uses: A utilizes B but could substitute it\n\
135- supports: A reinforces or enables B\n\
136- causes: A triggers or produces B\n\
137- fixes: A resolves a problem in B\n\
138- contradicts: A conflicts with or invalidates B\n\
139- applies-to: A is relevant to or scoped within B\n\
140- follows: A comes after B in sequence\n\
141- replaces: A substitutes B\n\
142- tracked-in: A is monitored in B\n\
143- related: A and B share context (use sparingly)\n\n\
144Respond with the correct relation, strength, and reasoning.";
145
146const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
147 "type": "object",
148 "properties": {
149 "relation": { "type": "string" },
150 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
151 "reasoning": { "type": "string" }
152 },
153 "required": ["relation", "strength", "reasoning"],
154 "additionalProperties": false
155}"#;
156
157const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
159Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
160If NO meaningful relationship exists, set relation to \"none\".\n\
161Respond with the relation (or \"none\"), strength, and reasoning.";
162
163const ENTITY_CONNECT_SCHEMA: &str = r#"{
164 "type": "object",
165 "properties": {
166 "relation": { "type": "string" },
167 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
168 "reasoning": { "type": "string" }
169 },
170 "required": ["relation", "strength", "reasoning"],
171 "additionalProperties": false
172}"#;
173
174const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
176Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
177If the current type is correct, keep it. If wrong, suggest the correct type.\n\
178Respond with the validated type and reasoning.";
179
180const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
181 "type": "object",
182 "properties": {
183 "validated_type": { "type": "string" },
184 "was_correct": { "type": "boolean" },
185 "reasoning": { "type": "string" }
186 },
187 "required": ["validated_type", "was_correct", "reasoning"],
188 "additionalProperties": false
189}"#;
190
191const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
193BAD: 'ingested from docs/auth.md'\n\
194GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
195Respond with the improved description and reasoning.";
196
197const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
198 "type": "object",
199 "properties": {
200 "description": { "type": "string" },
201 "reasoning": { "type": "string" }
202 },
203 "required": ["description", "reasoning"],
204 "additionalProperties": false
205}"#;
206
207const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
209Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
210
211const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
212 "type": "object",
213 "properties": {
214 "domain": { "type": "string" },
215 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
216 "reasoning": { "type": "string" }
217 },
218 "required": ["domain", "confidence", "reasoning"],
219 "additionalProperties": false
220}"#;
221
222const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
224Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
225Respond with a list of issues found (or empty if none) and an overall quality score.";
226
227const GRAPH_AUDIT_SCHEMA: &str = r#"{
228 "type": "object",
229 "properties": {
230 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
231 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
232 "reasoning": { "type": "string" }
233 },
234 "required": ["quality_score", "issues", "reasoning"],
235 "additionalProperties": false
236}"#;
237
238const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
240Entity names: lowercase kebab-case, domain-specific.\n\
241Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
242Respond with extracted entities, relationships, and a synthesis summary.";
243
244const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
245 "type": "object",
246 "properties": {
247 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
248 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
249 "summary": { "type": "string" }
250 },
251 "required": ["entities", "relationships", "summary"],
252 "additionalProperties": false
253}"#;
254
255const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
257Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
258Respond with the restructured body and a brief summary of changes.";
259
260const BODY_EXTRACT_SCHEMA: &str = r#"{
261 "type": "object",
262 "properties": {
263 "restructured_body": { "type": "string" },
264 "changes_summary": { "type": "string" }
265 },
266 "required": ["restructured_body", "changes_summary"],
267 "additionalProperties": false
268}"#;
269
270const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2751. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2762. Typed relationships between entities with strength scores\n\n\
277Rules:\n\
278- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
279- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
280- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
281- NEVER use 'mentions' as relationship type\n\
282- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
283- Prefer fewer high-quality entities over many low-quality ones";
284
285const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
286
287const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
288
289#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
295#[serde(rename_all = "kebab-case")]
296pub enum EnrichOperation {
297 MemoryBindings,
299 EntityDescriptions,
301 BodyEnrich,
303 WeightCalibrate,
305 RelationReclassify,
307 EntityConnect,
309 EntityTypeValidate,
311 DescriptionEnrich,
313 CrossDomainBridges,
315 DomainClassify,
317 GraphAudit,
319 DeepResearchSynth,
321 BodyExtract,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
327pub enum EnrichMode {
328 ClaudeCode,
330 Codex,
332}
333
334impl std::fmt::Display for EnrichMode {
335 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336 match self {
337 EnrichMode::ClaudeCode => write!(f, "claude-code"),
338 EnrichMode::Codex => write!(f, "codex"),
339 }
340 }
341}
342
343#[derive(clap::Args)]
345#[command(
346 about = "Enrich graph memories and entities using an LLM provider",
347 after_long_help = "EXAMPLES:\n \
348 # Add missing entity bindings to all unbound memories\n \
349 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
350 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
351 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
352 # Expand short memory bodies (GAP-18)\n \
353 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
354 # Resume an interrupted body-enrich run\n \
355 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
356 # Retry only failed items from a previous run\n \
357 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
358 EXIT CODES:\n \
359 0 success\n \
360 1 validation error (bad args, binary not found)\n \
361 14 I/O error"
362)]
363pub struct EnrichArgs {
364 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
366 pub operation: EnrichOperation,
367
368 #[arg(long, value_enum, default_value = "claude-code")]
370 pub mode: EnrichMode,
371
372 #[arg(long, value_name = "N")]
374 pub limit: Option<usize>,
375
376 #[arg(long)]
378 pub dry_run: bool,
379
380 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
382 pub namespace: Option<String>,
383
384 #[arg(long, value_name = "PATH")]
387 pub claude_binary: Option<PathBuf>,
388
389 #[arg(long, value_name = "MODEL")]
391 pub claude_model: Option<String>,
392
393 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
395 pub claude_timeout: u64,
396
397 #[arg(long, value_name = "PATH")]
400 pub codex_binary: Option<PathBuf>,
401
402 #[arg(long, value_name = "MODEL")]
404 pub codex_model: Option<String>,
405
406 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
408 pub codex_timeout: u64,
409
410 #[arg(long, value_name = "USD")]
413 pub max_cost_usd: Option<f64>,
414
415 #[arg(long)]
418 pub resume: bool,
419
420 #[arg(long)]
422 pub retry_failed: bool,
423
424 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
427 pub min_output_chars: usize,
428
429 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
431 pub max_output_chars: usize,
432
433 #[arg(long, default_value_t = true)]
435 pub preserve_check: bool,
436
437 #[arg(long, value_name = "PATH")]
439 pub prompt_template: Option<PathBuf>,
440
441 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
445 pub llm_parallelism: u32,
446
447 #[arg(long)]
450 pub json: bool,
451
452 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
454 pub db: Option<String>,
455}
456
457#[derive(Debug, Serialize)]
466struct PhaseEvent<'a> {
467 phase: &'a str,
468 #[serde(skip_serializing_if = "Option::is_none")]
469 binary_path: Option<&'a str>,
470 #[serde(skip_serializing_if = "Option::is_none")]
471 version: Option<&'a str>,
472 #[serde(skip_serializing_if = "Option::is_none")]
473 items_total: Option<usize>,
474 #[serde(skip_serializing_if = "Option::is_none")]
475 items_pending: Option<usize>,
476 #[serde(skip_serializing_if = "Option::is_none")]
478 llm_parallelism: Option<u32>,
479}
480
481#[derive(Debug, Serialize)]
482struct ItemEvent<'a> {
483 item: &'a str,
485 status: &'a str,
486 #[serde(skip_serializing_if = "Option::is_none")]
487 memory_id: Option<i64>,
488 #[serde(skip_serializing_if = "Option::is_none")]
489 entity_id: Option<i64>,
490 #[serde(skip_serializing_if = "Option::is_none")]
491 entities: Option<usize>,
492 #[serde(skip_serializing_if = "Option::is_none")]
493 rels: Option<usize>,
494 #[serde(skip_serializing_if = "Option::is_none")]
495 chars_before: Option<usize>,
496 #[serde(skip_serializing_if = "Option::is_none")]
497 chars_after: Option<usize>,
498 #[serde(skip_serializing_if = "Option::is_none")]
499 cost_usd: Option<f64>,
500 #[serde(skip_serializing_if = "Option::is_none")]
501 elapsed_ms: Option<u64>,
502 #[serde(skip_serializing_if = "Option::is_none")]
503 error: Option<String>,
504 index: usize,
505 total: usize,
506}
507
508#[derive(Debug, Serialize)]
509struct EnrichSummary {
510 summary: bool,
511 operation: String,
512 items_total: usize,
513 completed: usize,
514 failed: usize,
515 skipped: usize,
516 cost_usd: f64,
517 elapsed_ms: u64,
518}
519
520use crate::output::emit_json_line as emit_json;
521
522fn open_queue_db(path: &str) -> Result<Connection, AppError> {
537 let conn = Connection::open(path)?;
538 conn.pragma_update(None, "journal_mode", "wal")?;
539 conn.execute_batch(
540 "CREATE TABLE IF NOT EXISTS queue (
541 id INTEGER PRIMARY KEY AUTOINCREMENT,
542 item_key TEXT NOT NULL UNIQUE,
543 item_type TEXT NOT NULL DEFAULT 'memory',
544 status TEXT NOT NULL DEFAULT 'pending',
545 memory_id INTEGER,
546 entity_id INTEGER,
547 entities INTEGER DEFAULT 0,
548 rels INTEGER DEFAULT 0,
549 error TEXT,
550 cost_usd REAL DEFAULT 0.0,
551 attempt INTEGER DEFAULT 0,
552 elapsed_ms INTEGER,
553 created_at TEXT DEFAULT (datetime('now')),
554 done_at TEXT
555 );
556 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
557 )?;
558 Ok(conn)
559}
560
561fn call_claude(
569 binary: &Path,
570 prompt: &str,
571 json_schema: &str,
572 input_text: &str,
573 model: Option<&str>,
574 timeout_secs: u64,
575) -> Result<(serde_json::Value, f64, bool), AppError> {
576 let result = crate::commands::claude_runner::run_claude(
577 binary,
578 prompt,
579 json_schema,
580 input_text,
581 model,
582 timeout_secs,
583 7,
584 )?;
585 Ok((result.value, result.cost_usd, result.is_oauth))
586}
587
588fn scan_unbound_memories(
596 conn: &Connection,
597 namespace: &str,
598 limit: Option<usize>,
599) -> Result<Vec<(i64, String, String)>, AppError> {
600 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
601 let sql = format!(
602 "SELECT m.id, m.name, m.body
603 FROM memories m
604 WHERE m.namespace = ?1
605 AND m.deleted_at IS NULL
606 AND NOT EXISTS (
607 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
608 )
609 ORDER BY m.id
610 {limit_clause}"
611 );
612 let mut stmt = conn.prepare(&sql)?;
613 let rows = stmt
614 .query_map(rusqlite::params![namespace], |r| {
615 Ok((
616 r.get::<_, i64>(0)?,
617 r.get::<_, String>(1)?,
618 r.get::<_, String>(2)?,
619 ))
620 })?
621 .collect::<Result<Vec<_>, _>>()?;
622 Ok(rows)
623}
624
625fn scan_entities_without_description(
629 conn: &Connection,
630 namespace: &str,
631 limit: Option<usize>,
632) -> Result<Vec<(i64, String, String)>, AppError> {
633 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
634 let sql = format!(
635 "SELECT id, name, type
636 FROM entities
637 WHERE namespace = ?1
638 AND (description IS NULL OR description = '')
639 ORDER BY id
640 {limit_clause}"
641 );
642 let mut stmt = conn.prepare(&sql)?;
643 let rows = stmt
644 .query_map(rusqlite::params![namespace], |r| {
645 Ok((
646 r.get::<_, i64>(0)?,
647 r.get::<_, String>(1)?,
648 r.get::<_, String>(2)?,
649 ))
650 })?
651 .collect::<Result<Vec<_>, _>>()?;
652 Ok(rows)
653}
654
655fn scan_short_body_memories(
659 conn: &Connection,
660 namespace: &str,
661 min_chars: usize,
662 limit: Option<usize>,
663) -> Result<Vec<(i64, String, String)>, AppError> {
664 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
665 let sql = format!(
666 "SELECT m.id, m.name, m.body
667 FROM memories m
668 WHERE m.namespace = ?1
669 AND m.deleted_at IS NULL
670 AND LENGTH(COALESCE(m.body,'')) < ?2
671 ORDER BY m.id
672 {limit_clause}"
673 );
674 let mut stmt = conn.prepare(&sql)?;
675 let rows = stmt
676 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
677 Ok((
678 r.get::<_, i64>(0)?,
679 r.get::<_, String>(1)?,
680 r.get::<_, String>(2)?,
681 ))
682 })?
683 .collect::<Result<Vec<_>, _>>()?;
684 Ok(rows)
685}
686
687#[allow(clippy::type_complexity)]
689fn scan_weight_candidates(
690 conn: &Connection,
691 namespace: &str,
692 limit: Option<usize>,
693) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
694 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
695 let sql = format!(
696 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
697 FROM relationships r \
698 JOIN entities e1 ON e1.id = r.source_id \
699 JOIN entities e2 ON e2.id = r.target_id \
700 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
701 ORDER BY r.weight DESC {limit_clause}"
702 );
703 let mut stmt = conn.prepare(&sql)?;
704 let rows = stmt
705 .query_map(rusqlite::params![namespace], |r| {
706 Ok((
707 r.get::<_, i64>(0)?,
708 r.get::<_, String>(1)?,
709 r.get::<_, String>(2)?,
710 r.get::<_, String>(3)?,
711 r.get::<_, f64>(4)?,
712 ))
713 })?
714 .collect::<Result<Vec<_>, _>>()?;
715 Ok(rows)
716}
717
718fn scan_generic_relations(
720 conn: &Connection,
721 namespace: &str,
722 limit: Option<usize>,
723) -> Result<Vec<(i64, String, String, String)>, AppError> {
724 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
725 let sql = format!(
726 "SELECT r.id, e1.name, e2.name, r.relation \
727 FROM relationships r \
728 JOIN entities e1 ON e1.id = r.source_id \
729 JOIN entities e2 ON e2.id = r.target_id \
730 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
731 ORDER BY r.id {limit_clause}"
732 );
733 let mut stmt = conn.prepare(&sql)?;
734 let rows = stmt
735 .query_map(rusqlite::params![namespace], |r| {
736 Ok((
737 r.get::<_, i64>(0)?,
738 r.get::<_, String>(1)?,
739 r.get::<_, String>(2)?,
740 r.get::<_, String>(3)?,
741 ))
742 })?
743 .collect::<Result<Vec<_>, _>>()?;
744 Ok(rows)
745}
746
747fn persist_memory_bindings(
756 conn: &Connection,
757 namespace: &str,
758 memory_id: i64,
759 entities_json: &serde_json::Value,
760 rels_json: &serde_json::Value,
761) -> Result<(usize, usize), AppError> {
762 #[derive(Deserialize)]
763 struct EntityItem {
764 name: String,
765 entity_type: String,
766 }
767 #[derive(Deserialize)]
768 struct RelItem {
769 source: String,
770 target: String,
771 relation: String,
772 strength: f64,
773 }
774
775 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
776 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
777
778 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
779 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
780
781 let mut ent_count = 0usize;
782 let mut rel_count = 0usize;
783
784 for item in &extracted_entities {
785 let entity_type = match item.entity_type.parse::<EntityType>() {
786 Ok(et) => et,
787 Err(_) => {
788 tracing::warn!(
789 target: "enrich",
790 entity = %item.name,
791 entity_type = %item.entity_type,
792 "entity type not recognized, skipping"
793 );
794 continue;
795 }
796 };
797 match entities::upsert_entity(
798 conn,
799 namespace,
800 &NewEntity {
801 name: item.name.clone(),
802 entity_type,
803 description: None,
804 },
805 ) {
806 Ok(eid) => {
807 let _ = entities::link_memory_entity(conn, memory_id, eid);
808 ent_count += 1;
809 }
810 Err(e) => {
811 tracing::warn!(
812 target: "enrich",
813 entity = %item.name,
814 error = %e,
815 "entity upsert skipped"
816 );
817 }
818 }
819 }
820
821 for rel in &extracted_rels {
822 let normalized = crate::parsers::normalize_relation(&rel.relation);
823 crate::parsers::warn_if_non_canonical(&normalized);
824
825 let src_name = crate::parsers::normalize_entity_name(&rel.source);
828 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
829 let src_id = entities::find_entity_id(conn, namespace, &src_name);
830 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
831 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
832 let new_rel = NewRelationship {
833 source: rel.source.clone(),
834 target: rel.target.clone(),
835 relation: normalized,
836 strength: rel.strength,
837 description: None,
838 };
839 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
840 rel_count += 1;
841 }
842 }
843 }
844
845 Ok((ent_count, rel_count))
846}
847
848fn persist_entity_description(
850 conn: &Connection,
851 entity_id: i64,
852 description: &str,
853) -> Result<(), AppError> {
854 conn.execute(
855 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
856 rusqlite::params![description, entity_id],
857 )?;
858 Ok(())
859}
860
861fn persist_enriched_body(
866 conn: &Connection,
867 namespace: &str,
868 memory_id: i64,
869 memory_name: &str,
870 new_body: &str,
871 paths: &crate::paths::AppPaths,
872) -> Result<(), AppError> {
873 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
875 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
876 rusqlite::params![memory_id],
877 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
878 )?;
879
880 let memory_type: String = conn.query_row(
881 "SELECT type FROM memories WHERE id=?1",
882 rusqlite::params![memory_id],
883 |r| r.get(0),
884 )?;
885
886 let description: String = conn.query_row(
887 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
888 rusqlite::params![memory_id],
889 |r| r.get(0),
890 )?;
891
892 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
893
894 let new_memory = memories::NewMemory {
895 namespace: namespace.to_string(),
896 name: memory_name.to_string(),
897 memory_type: memory_type.clone(),
898 description: description.clone(),
899 body: new_body.to_string(),
900 body_hash,
901 session_id: None,
902 source: "enrich".to_string(),
903 metadata: serde_json::Value::Object(serde_json::Map::new()),
904 };
905
906 memories::update(conn, memory_id, &new_memory, None)?;
907 memories::sync_fts_after_update(
908 conn,
909 memory_id,
910 &old_name,
911 &old_desc,
912 &old_body,
913 &new_memory.name,
914 &new_memory.description,
915 &new_memory.body,
916 )?;
917
918 let snippet: String = new_body.chars().take(200).collect();
920 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
921 let chunks_info = crate::chunking::split_into_chunks_hierarchical(new_body, tokenizer);
922 let embedding_result = if chunks_info.len() <= 1 {
923 crate::daemon::embed_passage_or_local(&paths.models, new_body)
924 } else {
925 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
926 let mut ok = true;
927 for chunk in &chunks_info {
928 let text = crate::chunking::chunk_text(new_body, chunk);
929 match crate::daemon::embed_passage_or_local(&paths.models, text) {
930 Ok(emb) => chunk_embeddings.push(emb),
931 Err(e) => {
932 tracing::warn!(target: "enrich", error = %e, "chunk embedding failed");
933 ok = false;
934 break;
935 }
936 }
937 }
938 if ok {
939 Ok(crate::chunking::aggregate_embeddings(&chunk_embeddings))
940 } else {
941 crate::daemon::embed_passage_or_local(&paths.models, new_body)
942 }
943 };
944
945 if let Ok(embedding) = embedding_result {
946 if let Err(e) = memories::upsert_vec(
947 conn,
948 memory_id,
949 namespace,
950 &memory_type,
951 &embedding,
952 memory_name,
953 &snippet,
954 ) {
955 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
956 }
957 }
958
959 Ok(())
960}
961
962pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
968 let started = Instant::now();
976
977 let paths = AppPaths::resolve(args.db.as_deref())?;
978 ensure_db_ready(&paths)?;
979 let conn = open_rw(&paths.db)?;
980 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
981
982 let _singleton =
986 crate::lock::acquire_job_singleton(crate::lock::JobType::Enrich, &namespace, None)?;
987
988 let provider_binary = match args.mode {
990 EnrichMode::ClaudeCode => {
991 let bin = find_claude_binary(args.claude_binary.as_deref())?;
992 let version = super::claude_runner::validate_claude_version(&bin)?;
993 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
994 emit_json(&PhaseEvent {
995 phase: "validate",
996 binary_path: bin.to_str(),
997 version: Some(&version),
998 items_total: None,
999 items_pending: None,
1000 llm_parallelism: None,
1001 });
1002 bin
1003 }
1004 EnrichMode::Codex => {
1005 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1007 emit_json(&PhaseEvent {
1008 phase: "validate",
1009 binary_path: bin.to_str(),
1010 version: None,
1011 items_total: None,
1012 items_pending: None,
1013 llm_parallelism: None,
1014 });
1015 bin
1016 }
1017 };
1018
1019 let scan_result = scan_operation(&conn, &namespace, args)?;
1021 let total = scan_result.len();
1022
1023 emit_json(&PhaseEvent {
1024 phase: "scan",
1025 binary_path: None,
1026 version: None,
1027 items_total: Some(total),
1028 items_pending: Some(total),
1029 llm_parallelism: Some(args.llm_parallelism),
1030 });
1031
1032 if args.dry_run {
1034 for (idx, key) in scan_result.iter().enumerate() {
1035 emit_json(&ItemEvent {
1036 item: key,
1037 status: "preview",
1038 memory_id: None,
1039 entity_id: None,
1040 entities: None,
1041 rels: None,
1042 chars_before: None,
1043 chars_after: None,
1044 cost_usd: None,
1045 elapsed_ms: None,
1046 error: None,
1047 index: idx,
1048 total,
1049 });
1050 }
1051 emit_json(&EnrichSummary {
1052 summary: true,
1053 operation: format!("{:?}", args.operation),
1054 items_total: total,
1055 completed: 0,
1056 failed: 0,
1057 skipped: 0,
1058 cost_usd: 0.0,
1059 elapsed_ms: started.elapsed().as_millis() as u64,
1060 });
1061 return Ok(());
1062 }
1063
1064 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1068
1069 if args.resume {
1070 let reset = queue_conn
1071 .execute(
1072 "UPDATE queue SET status='pending' WHERE status='processing'",
1073 [],
1074 )
1075 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1076 if reset > 0 {
1077 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1078 }
1079 }
1080
1081 if args.retry_failed {
1082 let count = queue_conn
1083 .execute(
1084 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1085 [],
1086 )
1087 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1088 tracing::info!(target: "enrich", count, "retrying failed items");
1089 }
1090
1091 if !args.resume && !args.retry_failed {
1092 queue_conn
1093 .execute("DELETE FROM queue", [])
1094 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1095 }
1096
1097 for (idx, key) in scan_result.iter().enumerate() {
1099 let item_type = match args.operation {
1100 EnrichOperation::EntityDescriptions => "entity",
1101 _ => "memory",
1102 };
1103 if let Err(e) = queue_conn.execute(
1104 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1105 rusqlite::params![key, item_type],
1106 ) {
1107 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1108 }
1109 let _ = idx; }
1111
1112 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1115 if parallelism > 1 {
1116 tracing::info!(
1117 target: "enrich",
1118 llm_parallelism = parallelism,
1119 "parallel LLM processing with bounded thread pool"
1120 );
1121 }
1122 if parallelism > 4 {
1127 tracing::warn!(
1128 target: "enrich",
1129 llm_parallelism = parallelism,
1130 recommended_max = 4,
1131 "llm_parallelism above 4 multiplies subprocess fan-out; \
1132 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR to \
1133 cut MCP children (G28-A)"
1134 );
1135 }
1136
1137 let mut completed = 0usize;
1138 let mut failed = 0usize;
1139 let mut skipped = 0usize;
1140 let mut cost_total = 0.0f64;
1141 let mut oauth_detected = false;
1142 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1143 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1144 let enrich_started = std::time::Instant::now();
1145
1146 let provider_timeout = match args.mode {
1147 EnrichMode::ClaudeCode => args.claude_timeout,
1148 EnrichMode::Codex => args.codex_timeout,
1149 };
1150
1151 let provider_model: Option<&str> = match args.mode {
1152 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1153 EnrichMode::Codex => args.codex_model.as_deref(),
1154 };
1155
1156 if parallelism > 1 {
1160 let stdout_mu = parking_lot::Mutex::new(());
1161 let budget = args.max_cost_usd;
1162 let operation = args.operation.clone();
1163 let mode = args.mode.clone();
1164 let min_oc = args.min_output_chars;
1165 let max_oc = args.max_output_chars;
1166 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1167
1168 struct WorkerResult {
1169 completed: usize,
1170 failed: usize,
1171 skipped: usize,
1172 cost: f64,
1173 oauth: bool,
1174 }
1175
1176 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1177 let handles: Vec<_> = (0..parallelism)
1178 .map(|worker_id| {
1179 let stdout_mu = &stdout_mu;
1180 let paths = &paths;
1181 let namespace = &namespace;
1182 let provider_binary = &provider_binary;
1183 let operation = &operation;
1184 let mode = &mode;
1185 let prompt_tpl = prompt_tpl.as_deref();
1186 s.spawn(move || {
1187 let w_conn = match open_rw(&paths.db) {
1188 Ok(c) => c,
1189 Err(e) => {
1190 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1191 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1192 }
1193 };
1194 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1195 Ok(c) => c,
1196 Err(e) => {
1197 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1198 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1199 }
1200 };
1201 let mut w_completed = 0usize;
1202 let mut w_failed = 0usize;
1203 let mut w_skipped = 0usize;
1204 let mut w_cost = 0.0f64;
1205 let mut w_oauth = false;
1206 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1207 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1208
1209 loop {
1210 if crate::shutdown_requested() {
1211 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1212 break;
1213 }
1214 if let Some(b) = budget {
1215 if !w_oauth && w_cost >= b {
1216 break;
1217 }
1218 }
1219 let pending: Option<(i64, String, String)> = w_queue
1220 .query_row(
1221 "UPDATE queue SET status='processing', attempt=attempt+1 \
1222 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1223 RETURNING id, item_key, item_type",
1224 [],
1225 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1226 )
1227 .ok();
1228 let (queue_id, item_key, _item_type) = match pending {
1229 Some(p) => p,
1230 None => break,
1231 };
1232 let item_started = Instant::now();
1233 let current_index = w_completed + w_failed + w_skipped;
1234
1235 let call_result = match operation {
1236 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1237 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1238 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, paths),
1239 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1240 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1241 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1242 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1243 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1244 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1245 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1246 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1247 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1248 };
1249
1250 match call_result {
1251 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1252 if is_oauth { w_oauth = true; }
1253 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1254 let _ = w_queue.execute(
1255 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1256 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1257 );
1258 w_completed += 1;
1259 if !is_oauth { w_cost += cost; }
1260 let _guard = stdout_mu.lock();
1261 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1262 }
1263 Ok(EnrichItemResult::Skipped { reason }) => {
1264 w_skipped += 1;
1265 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1266 let _guard = stdout_mu.lock();
1267 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1268 }
1269 Err(e) => {
1270 let err_str = format!("{e}");
1271 if matches!(e, AppError::RateLimited { .. }) {
1272 if crate::retry::is_kill_switch_active() {
1273 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1274 } else if std::time::Instant::now() >= w_deadline {
1275 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1276 } else {
1277 let half = w_backoff / 2;
1278 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1279 let actual_wait = half + jitter;
1280 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1281 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1282 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1283 w_backoff = (w_backoff * 2).min(900);
1284 continue;
1285 }
1286 }
1287 w_failed += 1;
1288 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1289 let _guard = stdout_mu.lock();
1290 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1291 }
1292 }
1293 }
1294 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1295 })
1296 })
1297 .collect();
1298 handles
1299 .into_iter()
1300 .map(|h| {
1301 h.join().unwrap_or(WorkerResult {
1302 completed: 0,
1303 failed: 0,
1304 skipped: 0,
1305 cost: 0.0,
1306 oauth: false,
1307 })
1308 })
1309 .collect()
1310 });
1311
1312 for r in &results {
1313 completed += r.completed;
1314 failed += r.failed;
1315 skipped += r.skipped;
1316 cost_total += r.cost;
1317 oauth_detected |= r.oauth;
1318 }
1319 } else {
1320 loop {
1322 if crate::shutdown_requested() {
1323 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1324 break;
1325 }
1326
1327 if let Some(budget) = args.max_cost_usd {
1329 if !oauth_detected && cost_total >= budget {
1330 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1331 break;
1332 }
1333 }
1334
1335 let pending: Option<(i64, String, String)> = queue_conn
1337 .query_row(
1338 "UPDATE queue SET status='processing', attempt=attempt+1 \
1339 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1340 RETURNING id, item_key, item_type",
1341 [],
1342 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1343 )
1344 .ok();
1345
1346 let (queue_id, item_key, item_type) = match pending {
1347 Some(p) => p,
1348 None => break,
1349 };
1350
1351 let item_started = Instant::now();
1352 let current_index = completed + failed + skipped;
1353
1354 let call_result = match args.operation {
1355 EnrichOperation::MemoryBindings => call_memory_bindings(
1356 &conn,
1357 &namespace,
1358 &item_key,
1359 &provider_binary,
1360 provider_model,
1361 provider_timeout,
1362 &args.mode,
1363 ),
1364 EnrichOperation::EntityDescriptions => call_entity_description(
1365 &conn,
1366 &namespace,
1367 &item_key,
1368 &provider_binary,
1369 provider_model,
1370 provider_timeout,
1371 &args.mode,
1372 ),
1373 EnrichOperation::BodyEnrich => call_body_enrich(
1374 &conn,
1375 &namespace,
1376 &item_key,
1377 &provider_binary,
1378 provider_model,
1379 provider_timeout,
1380 &args.mode,
1381 args.min_output_chars,
1382 args.max_output_chars,
1383 args.prompt_template.as_deref(),
1384 &paths,
1385 ),
1386 EnrichOperation::WeightCalibrate => call_weight_calibrate(
1387 &conn,
1388 &namespace,
1389 &item_key,
1390 &provider_binary,
1391 provider_model,
1392 provider_timeout,
1393 &args.mode,
1394 ),
1395 EnrichOperation::RelationReclassify => call_relation_reclassify(
1396 &conn,
1397 &namespace,
1398 &item_key,
1399 &provider_binary,
1400 provider_model,
1401 provider_timeout,
1402 &args.mode,
1403 ),
1404 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1405 call_entity_connect(
1406 &conn,
1407 &namespace,
1408 &item_key,
1409 &provider_binary,
1410 provider_model,
1411 provider_timeout,
1412 &args.mode,
1413 )
1414 }
1415 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
1416 &conn,
1417 &namespace,
1418 &item_key,
1419 &provider_binary,
1420 provider_model,
1421 provider_timeout,
1422 &args.mode,
1423 ),
1424 EnrichOperation::DescriptionEnrich => call_description_enrich(
1425 &conn,
1426 &namespace,
1427 &item_key,
1428 &provider_binary,
1429 provider_model,
1430 provider_timeout,
1431 &args.mode,
1432 ),
1433 EnrichOperation::DomainClassify => call_domain_classify(
1434 &conn,
1435 &namespace,
1436 &item_key,
1437 &provider_binary,
1438 provider_model,
1439 provider_timeout,
1440 &args.mode,
1441 ),
1442 EnrichOperation::GraphAudit => call_graph_audit(
1443 &conn,
1444 &namespace,
1445 &item_key,
1446 &provider_binary,
1447 provider_model,
1448 provider_timeout,
1449 &args.mode,
1450 ),
1451 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
1452 &conn,
1453 &namespace,
1454 &item_key,
1455 &provider_binary,
1456 provider_model,
1457 provider_timeout,
1458 &args.mode,
1459 ),
1460 EnrichOperation::BodyExtract => call_body_extract(
1461 &conn,
1462 &namespace,
1463 &item_key,
1464 &provider_binary,
1465 provider_model,
1466 provider_timeout,
1467 &args.mode,
1468 ),
1469 };
1470
1471 match call_result {
1472 Ok(EnrichItemResult::Done {
1473 memory_id,
1474 entity_id,
1475 entities,
1476 rels,
1477 chars_before,
1478 chars_after,
1479 cost,
1480 is_oauth,
1481 }) => {
1482 if is_oauth && !oauth_detected {
1483 oauth_detected = true;
1484 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
1485 }
1486 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1487
1488 let persist_err: Option<String> = match args.operation {
1490 EnrichOperation::MemoryBindings => {
1491 None
1493 }
1494 EnrichOperation::EntityDescriptions => {
1495 None
1497 }
1498 EnrichOperation::BodyEnrich => {
1499 None
1501 }
1502 _ => {
1503 None
1505 }
1506 };
1507
1508 if let Err(e) = queue_conn.execute(
1509 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1510 rusqlite::params![
1511 memory_id,
1512 entity_id,
1513 entities as i64,
1514 rels as i64,
1515 cost,
1516 item_started.elapsed().as_millis() as i64,
1517 queue_id
1518 ],
1519 ) {
1520 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
1521 }
1522
1523 if persist_err.is_none() {
1524 completed += 1;
1525 if !is_oauth {
1526 cost_total += cost;
1527 }
1528 emit_json(&ItemEvent {
1529 item: &item_key,
1530 status: "done",
1531 memory_id,
1532 entity_id,
1533 entities: Some(entities),
1534 rels: Some(rels),
1535 chars_before,
1536 chars_after,
1537 cost_usd: if is_oauth { None } else { Some(cost) },
1538 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1539 error: None,
1540 index: current_index,
1541 total,
1542 });
1543 } else {
1544 failed += 1;
1545 emit_json(&ItemEvent {
1546 item: &item_key,
1547 status: "failed",
1548 memory_id: None,
1549 entity_id: None,
1550 entities: None,
1551 rels: None,
1552 chars_before: None,
1553 chars_after: None,
1554 cost_usd: None,
1555 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1556 error: persist_err,
1557 index: current_index,
1558 total,
1559 });
1560 }
1561 }
1562 Ok(EnrichItemResult::Skipped { reason }) => {
1563 skipped += 1;
1564 if let Err(e) = queue_conn.execute(
1565 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1566 rusqlite::params![reason, queue_id],
1567 ) {
1568 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
1569 }
1570 emit_json(&ItemEvent {
1571 item: &item_key,
1572 status: "skipped",
1573 memory_id: None,
1574 entity_id: None,
1575 entities: None,
1576 rels: None,
1577 chars_before: None,
1578 chars_after: None,
1579 cost_usd: None,
1580 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1581 error: None,
1582 index: current_index,
1583 total,
1584 });
1585 }
1586 Err(e) => {
1587 let err_str = format!("{e}");
1588 if matches!(e, AppError::RateLimited { .. }) {
1589 if crate::retry::is_kill_switch_active() {
1590 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1591 } else if std::time::Instant::now() >= rate_limit_deadline {
1592 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
1593 } else {
1594 let half = backoff_secs / 2;
1595 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1596 let actual_wait = half + jitter;
1597 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1598 if let Err(qe) = queue_conn.execute(
1599 "UPDATE queue SET status='pending' WHERE id=?1",
1600 rusqlite::params![queue_id],
1601 ) {
1602 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
1603 }
1604 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1605 backoff_secs = (backoff_secs * 2).min(900);
1606 continue;
1607 }
1608 }
1609
1610 failed += 1;
1611 if let Err(qe) = queue_conn.execute(
1612 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1613 rusqlite::params![err_str, queue_id],
1614 ) {
1615 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
1616 }
1617 emit_json(&ItemEvent {
1618 item: &item_key,
1619 status: "failed",
1620 memory_id: None,
1621 entity_id: None,
1622 entities: None,
1623 rels: None,
1624 chars_before: None,
1625 chars_after: None,
1626 cost_usd: None,
1627 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1628 error: Some(err_str),
1629 index: current_index,
1630 total,
1631 });
1632 }
1633 }
1634
1635 let _ = item_type; }
1637 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1640 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1641
1642 emit_json(&EnrichSummary {
1643 summary: true,
1644 operation: format!("{:?}", args.operation),
1645 items_total: total,
1646 completed,
1647 failed,
1648 skipped,
1649 cost_usd: cost_total,
1650 elapsed_ms: started.elapsed().as_millis() as u64,
1651 });
1652
1653 if failed == 0 {
1654 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
1655 }
1656
1657 Ok(())
1658}
1659
1660enum EnrichItemResult {
1665 Done {
1666 memory_id: Option<i64>,
1667 entity_id: Option<i64>,
1668 entities: usize,
1669 rels: usize,
1670 chars_before: Option<usize>,
1671 chars_after: Option<usize>,
1672 cost: f64,
1673 is_oauth: bool,
1674 },
1675 Skipped {
1676 reason: String,
1677 },
1678}
1679
1680fn call_memory_bindings(
1685 conn: &Connection,
1686 namespace: &str,
1687 memory_name: &str,
1688 binary: &Path,
1689 model: Option<&str>,
1690 timeout: u64,
1691 mode: &EnrichMode,
1692) -> Result<EnrichItemResult, AppError> {
1693 let (memory_id, body): (i64, String) = conn.query_row(
1695 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1696 rusqlite::params![namespace, memory_name],
1697 |r| Ok((r.get(0)?, r.get(1)?)),
1698 ).map_err(|e| match e {
1699 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
1700 other => AppError::Database(other),
1701 })?;
1702
1703 if body.trim().is_empty() {
1704 return Ok(EnrichItemResult::Skipped {
1705 reason: "body is empty".to_string(),
1706 });
1707 }
1708
1709 let (value, cost, is_oauth) = match mode {
1710 EnrichMode::ClaudeCode => call_claude(
1711 binary,
1712 BINDINGS_PROMPT,
1713 BINDINGS_SCHEMA,
1714 &body,
1715 model,
1716 timeout,
1717 )?,
1718 EnrichMode::Codex => call_codex(
1719 binary,
1720 BINDINGS_PROMPT,
1721 BINDINGS_SCHEMA,
1722 &body,
1723 model,
1724 timeout,
1725 )?,
1726 };
1727
1728 let empty_arr = serde_json::Value::Array(vec![]);
1729 let entities_val = value.get("entities").unwrap_or(&empty_arr);
1730 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
1731
1732 let (ent_count, rel_count) =
1733 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
1734
1735 Ok(EnrichItemResult::Done {
1736 memory_id: Some(memory_id),
1737 entity_id: None,
1738 entities: ent_count,
1739 rels: rel_count,
1740 chars_before: None,
1741 chars_after: None,
1742 cost,
1743 is_oauth,
1744 })
1745}
1746
1747fn call_entity_description(
1748 conn: &Connection,
1749 namespace: &str,
1750 entity_name: &str,
1751 binary: &Path,
1752 model: Option<&str>,
1753 timeout: u64,
1754 mode: &EnrichMode,
1755) -> Result<EnrichItemResult, AppError> {
1756 let (entity_id, entity_type): (i64, String) = conn
1757 .query_row(
1758 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
1759 rusqlite::params![namespace, entity_name],
1760 |r| Ok((r.get(0)?, r.get(1)?)),
1761 )
1762 .map_err(|e| match e {
1763 rusqlite::Error::QueryReturnedNoRows => {
1764 AppError::NotFound(format!("entity '{entity_name}' not found"))
1765 }
1766 other => AppError::Database(other),
1767 })?;
1768
1769 let prompt = format!(
1770 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
1771 );
1772
1773 let (value, cost, is_oauth) = match mode {
1774 EnrichMode::ClaudeCode => call_claude(
1775 binary,
1776 &prompt,
1777 ENTITY_DESCRIPTION_SCHEMA,
1778 "",
1779 model,
1780 timeout,
1781 )?,
1782 EnrichMode::Codex => call_codex(
1783 binary,
1784 &prompt,
1785 ENTITY_DESCRIPTION_SCHEMA,
1786 "",
1787 model,
1788 timeout,
1789 )?,
1790 };
1791
1792 let description = value
1793 .get("description")
1794 .and_then(|v| v.as_str())
1795 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
1796
1797 persist_entity_description(conn, entity_id, description)?;
1798
1799 Ok(EnrichItemResult::Done {
1800 memory_id: None,
1801 entity_id: Some(entity_id),
1802 entities: 0,
1803 rels: 0,
1804 chars_before: None,
1805 chars_after: None,
1806 cost,
1807 is_oauth,
1808 })
1809}
1810
1811#[allow(clippy::too_many_arguments)]
1812fn call_body_enrich(
1813 conn: &Connection,
1814 namespace: &str,
1815 memory_name: &str,
1816 binary: &Path,
1817 model: Option<&str>,
1818 timeout: u64,
1819 mode: &EnrichMode,
1820 min_output_chars: usize,
1821 max_output_chars: usize,
1822 prompt_template: Option<&Path>,
1823 paths: &crate::paths::AppPaths,
1824) -> Result<EnrichItemResult, AppError> {
1825 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
1826 .query_row(
1827 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
1828 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1829 rusqlite::params![namespace, memory_name],
1830 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
1831 )
1832 .map_err(|e| match e {
1833 rusqlite::Error::QueryReturnedNoRows => {
1834 AppError::NotFound(format!("memory '{memory_name}' not found"))
1835 }
1836 other => AppError::Database(other),
1837 })?;
1838
1839 let chars_before = body.chars().count();
1840
1841 let linked_entities: Vec<String> = {
1843 let mut stmt = conn.prepare_cached(
1844 "SELECT e.name FROM memory_entities me \
1845 JOIN entities e ON e.id = me.entity_id \
1846 WHERE me.memory_id = ?1 LIMIT 10",
1847 )?;
1848 let result: Vec<String> = stmt
1849 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
1850 .filter_map(|r| r.ok())
1851 .collect();
1852 drop(stmt);
1853 result
1854 };
1855
1856 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
1858 let file_size = std::fs::metadata(tmpl_path)
1859 .map_err(|e| {
1860 AppError::Io(std::io::Error::new(
1861 e.kind(),
1862 format!("failed to stat prompt template: {e}"),
1863 ))
1864 })?
1865 .len();
1866 if file_size > MAX_MEMORY_BODY_LEN as u64 {
1867 return Err(AppError::LimitExceeded(
1868 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
1869 ));
1870 }
1871 std::fs::read_to_string(tmpl_path).map_err(|e| {
1872 AppError::Io(std::io::Error::new(
1873 e.kind(),
1874 format!("failed to read prompt template: {e}"),
1875 ))
1876 })?
1877 } else {
1878 BODY_ENRICH_PROMPT_PREFIX.to_string()
1879 };
1880
1881 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
1883 let mut ctx = String::new();
1884 ctx.push_str(&format!(
1885 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
1886 ));
1887 if !description.is_empty() {
1888 ctx.push_str(&format!("- Description: {description}\n"));
1889 }
1890 ctx.push_str(&format!("- Domain: {namespace}\n"));
1891 if !linked_entities.is_empty() {
1892 ctx.push_str(&format!(
1893 "- Linked entities: {}\n",
1894 linked_entities.join(", ")
1895 ));
1896 }
1897 ctx
1898 } else {
1899 String::new()
1900 };
1901
1902 let prompt = format!(
1903 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
1904 );
1905
1906 let (value, cost, is_oauth) = match mode {
1908 EnrichMode::ClaudeCode => {
1909 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1910 }
1911 EnrichMode::Codex => {
1912 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1913 }
1914 };
1915
1916 let enriched_body = value
1917 .get("enriched_body")
1918 .and_then(|v| v.as_str())
1919 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
1920
1921 let chars_after = enriched_body.chars().count();
1922
1923 if chars_after <= chars_before {
1925 return Ok(EnrichItemResult::Skipped {
1926 reason: format!(
1927 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
1928 ),
1929 });
1930 }
1931
1932 persist_enriched_body(
1933 conn,
1934 namespace,
1935 memory_id,
1936 memory_name,
1937 enriched_body,
1938 paths,
1939 )?;
1940
1941 Ok(EnrichItemResult::Done {
1942 memory_id: Some(memory_id),
1943 entity_id: None,
1944 entities: 0,
1945 rels: 0,
1946 chars_before: Some(chars_before),
1947 chars_after: Some(chars_after),
1948 cost,
1949 is_oauth,
1950 })
1951}
1952
1953fn scan_operation(
1958 conn: &Connection,
1959 namespace: &str,
1960 args: &EnrichArgs,
1961) -> Result<Vec<String>, AppError> {
1962 match args.operation {
1963 EnrichOperation::MemoryBindings => {
1964 let rows = scan_unbound_memories(conn, namespace, args.limit)?;
1965 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1966 }
1967 EnrichOperation::EntityDescriptions => {
1968 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
1969 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1970 }
1971 EnrichOperation::BodyEnrich => {
1972 let rows =
1973 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
1974 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1975 }
1976 EnrichOperation::WeightCalibrate => {
1977 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
1978 Ok(rows
1979 .into_iter()
1980 .map(|(id, _, _, _, _)| id.to_string())
1981 .collect())
1982 }
1983 EnrichOperation::RelationReclassify => {
1984 let rows = scan_generic_relations(conn, namespace, args.limit)?;
1985 Ok(rows
1986 .into_iter()
1987 .map(|(id, _, _, _)| id.to_string())
1988 .collect())
1989 }
1990 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1991 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
1992 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
1993 }
1994 EnrichOperation::EntityTypeValidate => {
1995 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
1996 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1997 }
1998 EnrichOperation::DescriptionEnrich => {
1999 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2000 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2001 }
2002 EnrichOperation::DomainClassify
2003 | EnrichOperation::GraphAudit
2004 | EnrichOperation::DeepResearchSynth
2005 | EnrichOperation::BodyExtract => {
2006 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2007 let sql = format!(
2008 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2009 );
2010 let mut stmt = conn.prepare(&sql)?;
2011 let names = stmt
2012 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2013 .collect::<Result<Vec<_>, _>>()?;
2014 Ok(names)
2015 }
2016 }
2017}
2018
2019fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2025 if let Some(p) = explicit {
2026 if p.exists() {
2027 return Ok(p.to_path_buf());
2028 }
2029 return Err(AppError::Validation(format!(
2030 "Codex binary not found at explicit path: {}",
2031 p.display()
2032 )));
2033 }
2034
2035 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2036 let p = PathBuf::from(&env_path);
2037 if p.exists() {
2038 return Ok(p);
2039 }
2040 }
2041
2042 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2043 if let Some(path_var) = std::env::var_os("PATH") {
2044 for dir in std::env::split_paths(&path_var) {
2045 let candidate = dir.join(name);
2046 if candidate.exists() {
2047 return Ok(candidate);
2048 }
2049 }
2050 }
2051
2052 Err(AppError::Validation(
2053 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2054 ))
2055}
2056
2057fn call_weight_calibrate(
2059 conn: &Connection,
2060 _namespace: &str,
2061 item_key: &str,
2062 binary: &Path,
2063 model: Option<&str>,
2064 timeout: u64,
2065 mode: &EnrichMode,
2066) -> Result<EnrichItemResult, AppError> {
2067 let rel_id: i64 = item_key
2068 .parse()
2069 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2070 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2071 .query_row(
2072 "SELECT e1.name, e2.name, r.relation, r.weight \
2073 FROM relationships r \
2074 JOIN entities e1 ON e1.id = r.source_id \
2075 JOIN entities e2 ON e2.id = r.target_id \
2076 WHERE r.id = ?1",
2077 rusqlite::params![rel_id],
2078 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2079 )
2080 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2081
2082 let input_text = format!(
2083 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2084 );
2085 let (value, cost, is_oauth) = match mode {
2086 EnrichMode::ClaudeCode => call_claude(
2087 binary,
2088 WEIGHT_CALIBRATE_PROMPT,
2089 WEIGHT_CALIBRATE_SCHEMA,
2090 &input_text,
2091 model,
2092 timeout,
2093 )?,
2094 EnrichMode::Codex => call_codex(
2095 binary,
2096 WEIGHT_CALIBRATE_PROMPT,
2097 WEIGHT_CALIBRATE_SCHEMA,
2098 &input_text,
2099 model,
2100 timeout,
2101 )?,
2102 };
2103
2104 let calibrated = value
2105 .get("calibrated_weight")
2106 .and_then(|v| v.as_f64())
2107 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2108
2109 conn.execute(
2110 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2111 rusqlite::params![calibrated, rel_id],
2112 )?;
2113
2114 Ok(EnrichItemResult::Done {
2115 memory_id: None,
2116 entity_id: None,
2117 entities: 0,
2118 rels: 1,
2119 chars_before: None,
2120 chars_after: None,
2121 cost,
2122 is_oauth,
2123 })
2124}
2125
2126fn call_relation_reclassify(
2128 conn: &Connection,
2129 _namespace: &str,
2130 item_key: &str,
2131 binary: &Path,
2132 model: Option<&str>,
2133 timeout: u64,
2134 mode: &EnrichMode,
2135) -> Result<EnrichItemResult, AppError> {
2136 let rel_id: i64 = item_key
2137 .parse()
2138 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2139 let (source_name, target_name, current_relation): (String, String, String) = conn
2140 .query_row(
2141 "SELECT e1.name, e2.name, r.relation \
2142 FROM relationships r \
2143 JOIN entities e1 ON e1.id = r.source_id \
2144 JOIN entities e2 ON e2.id = r.target_id \
2145 WHERE r.id = ?1",
2146 rusqlite::params![rel_id],
2147 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2148 )
2149 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2150
2151 let input_text = format!(
2152 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2153 );
2154 let (value, cost, is_oauth) = match mode {
2155 EnrichMode::ClaudeCode => call_claude(
2156 binary,
2157 RELATION_RECLASSIFY_PROMPT,
2158 RELATION_RECLASSIFY_SCHEMA,
2159 &input_text,
2160 model,
2161 timeout,
2162 )?,
2163 EnrichMode::Codex => call_codex(
2164 binary,
2165 RELATION_RECLASSIFY_PROMPT,
2166 RELATION_RECLASSIFY_SCHEMA,
2167 &input_text,
2168 model,
2169 timeout,
2170 )?,
2171 };
2172
2173 let new_relation = value
2174 .get("relation")
2175 .and_then(|v| v.as_str())
2176 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2177 let new_strength = value
2178 .get("strength")
2179 .and_then(|v| v.as_f64())
2180 .unwrap_or(0.5);
2181
2182 conn.execute(
2183 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2184 rusqlite::params![new_relation, new_strength, rel_id],
2185 )?;
2186
2187 Ok(EnrichItemResult::Done {
2188 memory_id: None,
2189 entity_id: None,
2190 entities: 0,
2191 rels: 1,
2192 chars_before: None,
2193 chars_after: None,
2194 cost,
2195 is_oauth,
2196 })
2197}
2198
2199fn call_entity_connect(
2201 conn: &Connection,
2202 namespace: &str,
2203 item_key: &str,
2204 binary: &Path,
2205 model: Option<&str>,
2206 timeout: u64,
2207 mode: &EnrichMode,
2208) -> Result<EnrichItemResult, AppError> {
2209 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
2210 let (e1_id, e1_name, e2_id, e2_name) =
2211 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
2212 Some(p) => p,
2213 None => {
2214 return Ok(EnrichItemResult::Skipped {
2215 reason: "pair no longer isolated".into(),
2216 })
2217 }
2218 };
2219 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
2220 let (value, cost, is_oauth) = match mode {
2221 EnrichMode::ClaudeCode => call_claude(
2222 binary,
2223 ENTITY_CONNECT_PROMPT,
2224 ENTITY_CONNECT_SCHEMA,
2225 &input_text,
2226 model,
2227 timeout,
2228 )?,
2229 EnrichMode::Codex => call_codex(
2230 binary,
2231 ENTITY_CONNECT_PROMPT,
2232 ENTITY_CONNECT_SCHEMA,
2233 &input_text,
2234 model,
2235 timeout,
2236 )?,
2237 };
2238 let relation = value
2239 .get("relation")
2240 .and_then(|v| v.as_str())
2241 .unwrap_or("none");
2242 if relation == "none" {
2243 return Ok(EnrichItemResult::Skipped {
2244 reason: "LLM determined no relationship".into(),
2245 });
2246 }
2247 let strength = value
2248 .get("strength")
2249 .and_then(|v| v.as_f64())
2250 .unwrap_or(0.5);
2251 conn.execute(
2252 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
2253 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
2254 )?;
2255 Ok(EnrichItemResult::Done {
2256 memory_id: None,
2257 entity_id: None,
2258 entities: 0,
2259 rels: 1,
2260 chars_before: None,
2261 chars_after: None,
2262 cost,
2263 is_oauth,
2264 })
2265}
2266
2267fn call_entity_type_validate(
2269 conn: &Connection,
2270 _namespace: &str,
2271 item_key: &str,
2272 binary: &Path,
2273 model: Option<&str>,
2274 timeout: u64,
2275 mode: &EnrichMode,
2276) -> Result<EnrichItemResult, AppError> {
2277 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
2278 .query_row(
2279 "SELECT id, name, type FROM entities WHERE name = ?1",
2280 rusqlite::params![item_key],
2281 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2282 )
2283 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
2284 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
2285 let (value, cost, is_oauth) = match mode {
2286 EnrichMode::ClaudeCode => call_claude(
2287 binary,
2288 ENTITY_TYPE_VALIDATE_PROMPT,
2289 ENTITY_TYPE_VALIDATE_SCHEMA,
2290 &input_text,
2291 model,
2292 timeout,
2293 )?,
2294 EnrichMode::Codex => call_codex(
2295 binary,
2296 ENTITY_TYPE_VALIDATE_PROMPT,
2297 ENTITY_TYPE_VALIDATE_SCHEMA,
2298 &input_text,
2299 model,
2300 timeout,
2301 )?,
2302 };
2303 let validated_type = value
2304 .get("validated_type")
2305 .and_then(|v| v.as_str())
2306 .unwrap_or(&ent_type);
2307 let was_correct = value
2308 .get("was_correct")
2309 .and_then(|v| v.as_bool())
2310 .unwrap_or(true);
2311 if !was_correct {
2312 conn.execute(
2313 "UPDATE entities SET type = ?1 WHERE id = ?2",
2314 rusqlite::params![validated_type, ent_id],
2315 )?;
2316 }
2317 Ok(EnrichItemResult::Done {
2318 memory_id: None,
2319 entity_id: Some(ent_id),
2320 entities: 1,
2321 rels: 0,
2322 chars_before: None,
2323 chars_after: None,
2324 cost,
2325 is_oauth,
2326 })
2327}
2328
2329fn call_description_enrich(
2331 conn: &Connection,
2332 _namespace: &str,
2333 item_key: &str,
2334 binary: &Path,
2335 model: Option<&str>,
2336 timeout: u64,
2337 mode: &EnrichMode,
2338) -> Result<EnrichItemResult, AppError> {
2339 let (mem_id, body, old_desc): (i64, String, String) = conn
2340 .query_row(
2341 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2342 rusqlite::params![item_key],
2343 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2344 )
2345 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2346 let snippet: String = body.chars().take(500).collect();
2347 let input_text = format!(
2348 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
2349 );
2350 let (value, cost, is_oauth) = match mode {
2351 EnrichMode::ClaudeCode => call_claude(
2352 binary,
2353 DESCRIPTION_ENRICH_PROMPT,
2354 DESCRIPTION_ENRICH_SCHEMA,
2355 &input_text,
2356 model,
2357 timeout,
2358 )?,
2359 EnrichMode::Codex => call_codex(
2360 binary,
2361 DESCRIPTION_ENRICH_PROMPT,
2362 DESCRIPTION_ENRICH_SCHEMA,
2363 &input_text,
2364 model,
2365 timeout,
2366 )?,
2367 };
2368 let new_desc = value
2369 .get("description")
2370 .and_then(|v| v.as_str())
2371 .unwrap_or(&old_desc);
2372 conn.execute(
2373 "UPDATE memories SET description = ?1 WHERE id = ?2",
2374 rusqlite::params![new_desc, mem_id],
2375 )?;
2376 Ok(EnrichItemResult::Done {
2377 memory_id: Some(mem_id),
2378 entity_id: None,
2379 entities: 0,
2380 rels: 0,
2381 chars_before: Some(old_desc.len()),
2382 chars_after: Some(new_desc.len()),
2383 cost,
2384 is_oauth,
2385 })
2386}
2387
2388fn call_domain_classify(
2390 conn: &Connection,
2391 _namespace: &str,
2392 item_key: &str,
2393 binary: &Path,
2394 model: Option<&str>,
2395 timeout: u64,
2396 mode: &EnrichMode,
2397) -> Result<EnrichItemResult, AppError> {
2398 let (mem_id, body, desc): (i64, String, String) = conn
2399 .query_row(
2400 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2401 rusqlite::params![item_key],
2402 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2403 )
2404 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2405 let snippet: String = body.chars().take(500).collect();
2406 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
2407 let (value, cost, is_oauth) = match mode {
2408 EnrichMode::ClaudeCode => call_claude(
2409 binary,
2410 DOMAIN_CLASSIFY_PROMPT,
2411 DOMAIN_CLASSIFY_SCHEMA,
2412 &input_text,
2413 model,
2414 timeout,
2415 )?,
2416 EnrichMode::Codex => call_codex(
2417 binary,
2418 DOMAIN_CLASSIFY_PROMPT,
2419 DOMAIN_CLASSIFY_SCHEMA,
2420 &input_text,
2421 model,
2422 timeout,
2423 )?,
2424 };
2425 let domain = value
2426 .get("domain")
2427 .and_then(|v| v.as_str())
2428 .unwrap_or("uncategorized");
2429 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
2430 conn.execute(
2431 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
2432 rusqlite::params![metadata, mem_id],
2433 )?;
2434 Ok(EnrichItemResult::Done {
2435 memory_id: Some(mem_id),
2436 entity_id: None,
2437 entities: 0,
2438 rels: 0,
2439 chars_before: None,
2440 chars_after: None,
2441 cost,
2442 is_oauth,
2443 })
2444}
2445
2446fn call_graph_audit(
2448 conn: &Connection,
2449 _namespace: &str,
2450 item_key: &str,
2451 binary: &Path,
2452 model: Option<&str>,
2453 timeout: u64,
2454 mode: &EnrichMode,
2455) -> Result<EnrichItemResult, AppError> {
2456 let (mem_id, body, desc): (i64, String, String) = conn
2457 .query_row(
2458 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2459 rusqlite::params![item_key],
2460 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2461 )
2462 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2463 let snippet: String = body.chars().take(500).collect();
2464 let ent_count: i64 = conn
2465 .query_row(
2466 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
2467 rusqlite::params![mem_id],
2468 |r| r.get(0),
2469 )
2470 .unwrap_or(0);
2471 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
2472 let (value, cost, is_oauth) = match mode {
2473 EnrichMode::ClaudeCode => call_claude(
2474 binary,
2475 GRAPH_AUDIT_PROMPT,
2476 GRAPH_AUDIT_SCHEMA,
2477 &input_text,
2478 model,
2479 timeout,
2480 )?,
2481 EnrichMode::Codex => call_codex(
2482 binary,
2483 GRAPH_AUDIT_PROMPT,
2484 GRAPH_AUDIT_SCHEMA,
2485 &input_text,
2486 model,
2487 timeout,
2488 )?,
2489 };
2490 let issues = value
2491 .get("issues")
2492 .and_then(|v| v.as_array())
2493 .map(|a| a.len())
2494 .unwrap_or(0);
2495 Ok(EnrichItemResult::Done {
2496 memory_id: Some(mem_id),
2497 entity_id: None,
2498 entities: 0,
2499 rels: issues,
2500 chars_before: None,
2501 chars_after: None,
2502 cost,
2503 is_oauth,
2504 })
2505}
2506
2507fn call_deep_research_synth(
2509 conn: &Connection,
2510 namespace: &str,
2511 item_key: &str,
2512 binary: &Path,
2513 model: Option<&str>,
2514 timeout: u64,
2515 mode: &EnrichMode,
2516) -> Result<EnrichItemResult, AppError> {
2517 let (mem_id, body): (i64, String) = conn
2518 .query_row(
2519 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2520 rusqlite::params![item_key],
2521 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
2522 )
2523 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2524 let snippet: String = body.chars().take(2000).collect();
2525 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
2526 let (value, cost, is_oauth) = match mode {
2527 EnrichMode::ClaudeCode => call_claude(
2528 binary,
2529 DEEP_RESEARCH_SYNTH_PROMPT,
2530 DEEP_RESEARCH_SYNTH_SCHEMA,
2531 &input_text,
2532 model,
2533 timeout,
2534 )?,
2535 EnrichMode::Codex => call_codex(
2536 binary,
2537 DEEP_RESEARCH_SYNTH_PROMPT,
2538 DEEP_RESEARCH_SYNTH_SCHEMA,
2539 &input_text,
2540 model,
2541 timeout,
2542 )?,
2543 };
2544 let mut ent_count = 0usize;
2545 let mut rel_count = 0usize;
2546 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
2547 for e in ents {
2548 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
2549 let etype_str = e
2550 .get("entity_type")
2551 .and_then(|v| v.as_str())
2552 .unwrap_or("concept");
2553 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
2554 if name.len() >= 2 {
2555 let ne = NewEntity {
2556 name: name.to_string(),
2557 entity_type: etype,
2558 description: None,
2559 };
2560 let _ = entities::upsert_entity(conn, namespace, &ne);
2561 ent_count += 1;
2562 }
2563 }
2564 }
2565 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
2566 for r in rels {
2567 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
2568 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
2569 if src.is_empty() || tgt.is_empty() {
2570 continue;
2571 }
2572 let rel = r
2573 .get("relation")
2574 .and_then(|v| v.as_str())
2575 .unwrap_or("related");
2576 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
2577 if let (Some(sid), Some(tid)) = (
2578 entities::find_entity_id(conn, namespace, src)?,
2579 entities::find_entity_id(conn, namespace, tgt)?,
2580 ) {
2581 let _ = entities::create_or_fetch_relationship(
2582 conn, namespace, sid, tid, rel, str_, None,
2583 );
2584 rel_count += 1;
2585 }
2586 }
2587 }
2588 Ok(EnrichItemResult::Done {
2589 memory_id: Some(mem_id),
2590 entity_id: None,
2591 entities: ent_count,
2592 rels: rel_count,
2593 chars_before: None,
2594 chars_after: None,
2595 cost,
2596 is_oauth,
2597 })
2598}
2599
2600fn call_body_extract(
2602 conn: &Connection,
2603 _namespace: &str,
2604 item_key: &str,
2605 binary: &Path,
2606 model: Option<&str>,
2607 timeout: u64,
2608 mode: &EnrichMode,
2609) -> Result<EnrichItemResult, AppError> {
2610 let (mem_id, body): (i64, String) = conn
2611 .query_row(
2612 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2613 rusqlite::params![item_key],
2614 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
2615 )
2616 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2617 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
2618 let (value, cost, is_oauth) = match mode {
2619 EnrichMode::ClaudeCode => call_claude(
2620 binary,
2621 BODY_EXTRACT_PROMPT,
2622 BODY_EXTRACT_SCHEMA,
2623 &input_text,
2624 model,
2625 timeout,
2626 )?,
2627 EnrichMode::Codex => call_codex(
2628 binary,
2629 BODY_EXTRACT_PROMPT,
2630 BODY_EXTRACT_SCHEMA,
2631 &input_text,
2632 model,
2633 timeout,
2634 )?,
2635 };
2636 let restructured = value
2637 .get("restructured_body")
2638 .and_then(|v| v.as_str())
2639 .unwrap_or(&body);
2640 let chars_before = body.len();
2641 let chars_after = restructured.len();
2642 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
2643 conn.execute(
2644 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
2645 rusqlite::params![restructured, new_hash, mem_id],
2646 )?;
2647 Ok(EnrichItemResult::Done {
2648 memory_id: Some(mem_id),
2649 entity_id: None,
2650 entities: 0,
2651 rels: 0,
2652 chars_before: Some(chars_before),
2653 chars_after: Some(chars_after),
2654 cost,
2655 is_oauth,
2656 })
2657}
2658
2659#[allow(clippy::type_complexity)]
2661fn scan_isolated_entity_pairs(
2662 conn: &Connection,
2663 namespace: &str,
2664 limit: Option<usize>,
2665) -> Result<Vec<(i64, String, i64, String)>, AppError> {
2666 let limit_val = limit.unwrap_or(50) as i64;
2667 let mut stmt = conn.prepare_cached(
2668 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
2669 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
2670 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
2671 (r.source_id = e1.id AND r.target_id = e2.id) OR \
2672 (r.source_id = e2.id AND r.target_id = e1.id)) \
2673 LIMIT ?2",
2674 )?;
2675 let rows = stmt
2676 .query_map(rusqlite::params![namespace, limit_val], |r| {
2677 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
2678 })?
2679 .collect::<Result<Vec<_>, _>>()?;
2680 Ok(rows)
2681}
2682
2683fn scan_entities_for_type_validation(
2685 conn: &Connection,
2686 namespace: &str,
2687 limit: Option<usize>,
2688) -> Result<Vec<(i64, String, String)>, AppError> {
2689 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2690 let sql = format!(
2691 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
2692 );
2693 let mut stmt = conn.prepare(&sql)?;
2694 let rows = stmt
2695 .query_map(rusqlite::params![namespace], |r| {
2696 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
2697 })?
2698 .collect::<Result<Vec<_>, _>>()?;
2699 Ok(rows)
2700}
2701
2702fn scan_generic_descriptions(
2704 conn: &Connection,
2705 namespace: &str,
2706 limit: Option<usize>,
2707) -> Result<Vec<(i64, String, String)>, AppError> {
2708 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2709 let sql = format!(
2710 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
2711 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
2712 ORDER BY id {limit_clause}"
2713 );
2714 let mut stmt = conn.prepare(&sql)?;
2715 let rows = stmt
2716 .query_map(rusqlite::params![namespace], |r| {
2717 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
2718 })?
2719 .collect::<Result<Vec<_>, _>>()?;
2720 Ok(rows)
2721}
2722
2723fn call_codex(
2727 binary: &Path,
2728 prompt: &str,
2729 json_schema: &str,
2730 input_text: &str,
2731 model: Option<&str>,
2732 timeout_secs: u64,
2733) -> Result<(serde_json::Value, f64, bool), AppError> {
2734 use wait_timeout::ChildExt;
2735
2736 let full_prompt = format!("{prompt}\n\n{input_text}");
2737 let schema_file = {
2738 let tmp = std::env::temp_dir().join(format!("enrich-schema-{}.json", std::process::id()));
2739 std::fs::write(&tmp, json_schema).map_err(AppError::Io)?;
2740 tmp
2741 };
2742
2743 let mut cmd = Command::new(binary);
2744 cmd.env_clear();
2745 for var in &[
2746 "PATH",
2747 "HOME",
2748 "USER",
2749 "OPENAI_API_KEY",
2750 "TMPDIR",
2751 "TMP",
2752 "TEMP",
2753 ] {
2754 if let Ok(val) = std::env::var(var) {
2755 cmd.env(var, val);
2756 }
2757 }
2758
2759 #[cfg(windows)]
2760 for var in &[
2761 "LOCALAPPDATA",
2762 "APPDATA",
2763 "USERPROFILE",
2764 "SystemRoot",
2765 "COMSPEC",
2766 "PATHEXT",
2767 ] {
2768 if let Ok(val) = std::env::var(var) {
2769 cmd.env(var, val);
2770 }
2771 }
2772
2773 cmd.arg("exec")
2774 .arg("--json")
2775 .arg("--output-schema")
2776 .arg(&schema_file);
2777
2778 if let Some(m) = model {
2779 cmd.arg("--model").arg(m);
2780 }
2781
2782 cmd.stdin(Stdio::piped())
2783 .stdout(Stdio::piped())
2784 .stderr(Stdio::piped());
2785
2786 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
2787 AppError::Io(std::io::Error::new(
2788 e.kind(),
2789 format!("failed to spawn codex: {e}"),
2790 ))
2791 })?;
2792
2793 let stdin_bytes = full_prompt.into_bytes();
2794 let mut child_stdin = child
2795 .stdin
2796 .take()
2797 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
2798 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
2799 child_stdin.write_all(&stdin_bytes)?;
2800 drop(child_stdin);
2801 Ok(())
2802 });
2803
2804 let start = std::time::Instant::now();
2805 let timeout = std::time::Duration::from_secs(timeout_secs);
2806 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
2807
2808 let _ = std::fs::remove_file(&schema_file);
2809
2810 match status {
2811 Some(exit_status) => {
2812 stdin_thread
2813 .join()
2814 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
2815 .map_err(AppError::Io)?;
2816
2817 tracing::debug!(
2818 target: "process",
2819 exit_code = ?exit_status.code(),
2820 elapsed_ms = start.elapsed().as_millis() as u64,
2821 "external process completed"
2822 );
2823
2824 let mut stdout_buf = Vec::new();
2825 if let Some(mut out) = child.stdout.take() {
2826 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
2827 }
2828 if !exit_status.success() {
2829 let mut stderr_buf = Vec::new();
2830 if let Some(mut err) = child.stderr.take() {
2831 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
2832 }
2833 let stderr_str = String::from_utf8_lossy(&stderr_buf);
2834 tracing::warn!(
2835 target: "enrich",
2836 exit_code = ?exit_status.code(),
2837 stderr = %stderr_str.trim(),
2838 "codex process failed"
2839 );
2840 return Err(AppError::Validation(format!(
2841 "codex exited with code {:?}: {}",
2842 exit_status.code(),
2843 stderr_str.trim()
2844 )));
2845 }
2846 let stdout_str = String::from_utf8(stdout_buf)
2847 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
2848 let value: serde_json::Value = serde_json::from_str(&stdout_str).map_err(|e| {
2849 AppError::Validation(format!("failed to parse codex output as JSON: {e}"))
2850 })?;
2851 Ok((value, 0.0, false))
2852 }
2853 None => {
2854 let _ = child.kill();
2855 let _ = child.wait();
2856 let _ = stdin_thread.join();
2857 Err(AppError::Validation(format!(
2858 "codex timed out after {timeout_secs} seconds"
2859 )))
2860 }
2861 }
2862}
2863
2864#[cfg(test)]
2869mod tests {
2870 use super::*;
2871 use rusqlite::Connection;
2872
2873 fn open_test_db() -> Connection {
2875 let conn = Connection::open_in_memory().expect("in-memory db");
2876 conn.execute_batch(
2877 "CREATE TABLE memories (
2878 id INTEGER PRIMARY KEY AUTOINCREMENT,
2879 namespace TEXT NOT NULL DEFAULT 'global',
2880 name TEXT NOT NULL,
2881 type TEXT NOT NULL DEFAULT 'note',
2882 description TEXT NOT NULL DEFAULT '',
2883 body TEXT NOT NULL DEFAULT '',
2884 body_hash TEXT NOT NULL DEFAULT '',
2885 session_id TEXT,
2886 source TEXT NOT NULL DEFAULT 'agent',
2887 metadata TEXT NOT NULL DEFAULT '{}',
2888 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
2889 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
2890 deleted_at INTEGER,
2891 UNIQUE(namespace, name)
2892 );
2893 CREATE TABLE entities (
2894 id INTEGER PRIMARY KEY AUTOINCREMENT,
2895 namespace TEXT NOT NULL DEFAULT 'global',
2896 name TEXT NOT NULL,
2897 type TEXT NOT NULL DEFAULT 'concept',
2898 description TEXT,
2899 degree INTEGER NOT NULL DEFAULT 0,
2900 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
2901 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
2902 UNIQUE(namespace, name)
2903 );
2904 CREATE TABLE memory_entities (
2905 memory_id INTEGER NOT NULL,
2906 entity_id INTEGER NOT NULL,
2907 PRIMARY KEY (memory_id, entity_id)
2908 );
2909 CREATE TABLE relationships (
2910 id INTEGER PRIMARY KEY AUTOINCREMENT,
2911 namespace TEXT NOT NULL DEFAULT 'global',
2912 source_id INTEGER NOT NULL,
2913 target_id INTEGER NOT NULL,
2914 relation TEXT NOT NULL,
2915 weight REAL NOT NULL DEFAULT 0.5,
2916 description TEXT,
2917 UNIQUE(source_id, target_id, relation)
2918 );",
2919 )
2920 .expect("schema creation must succeed");
2921 conn
2922 }
2923
2924 #[test]
2925 fn scan_unbound_memories_finds_memories_without_bindings() {
2926 let conn = open_test_db();
2927 conn.execute(
2928 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
2929 [],
2930 )
2931 .unwrap();
2932
2933 let results = scan_unbound_memories(&conn, "global", None).unwrap();
2934 assert_eq!(results.len(), 1);
2935 assert_eq!(results[0].1, "test-mem");
2936 }
2937
2938 #[test]
2939 fn scan_unbound_memories_excludes_bound_memories() {
2940 let conn = open_test_db();
2941 conn.execute(
2942 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
2943 [],
2944 )
2945 .unwrap();
2946 let mem_id: i64 = conn
2947 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
2948 r.get(0)
2949 })
2950 .unwrap();
2951 conn.execute(
2952 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
2953 [],
2954 )
2955 .unwrap();
2956 let ent_id: i64 = conn
2957 .query_row(
2958 "SELECT id FROM entities WHERE name='some-entity'",
2959 [],
2960 |r| r.get(0),
2961 )
2962 .unwrap();
2963 conn.execute(
2964 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
2965 rusqlite::params![mem_id, ent_id],
2966 )
2967 .unwrap();
2968
2969 let results = scan_unbound_memories(&conn, "global", None).unwrap();
2970 assert!(results.is_empty(), "bound memory must not appear in scan");
2971 }
2972
2973 #[test]
2974 fn scan_entities_without_description_finds_null_description() {
2975 let conn = open_test_db();
2976 conn.execute(
2977 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
2978 [],
2979 )
2980 .unwrap();
2981
2982 let results = scan_entities_without_description(&conn, "global", None).unwrap();
2983 assert_eq!(results.len(), 1);
2984 assert_eq!(results[0].1, "my-tool");
2985 }
2986
2987 #[test]
2988 fn scan_entities_without_description_excludes_entities_with_description() {
2989 let conn = open_test_db();
2990 conn.execute(
2991 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
2992 [],
2993 )
2994 .unwrap();
2995
2996 let results = scan_entities_without_description(&conn, "global", None).unwrap();
2997 assert!(
2998 results.is_empty(),
2999 "entity with description must not appear"
3000 );
3001 }
3002
3003 #[test]
3004 fn scan_short_body_memories_finds_short_bodies() {
3005 let conn = open_test_db();
3006 conn.execute(
3007 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3008 [],
3009 )
3010 .unwrap();
3011
3012 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3013 assert_eq!(results.len(), 1);
3014 assert_eq!(results[0].1, "short-mem");
3015 }
3016
3017 #[test]
3018 fn scan_short_body_memories_excludes_long_bodies() {
3019 let conn = open_test_db();
3020 let long_body = "a".repeat(1000);
3021 conn.execute(
3022 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3023 rusqlite::params![long_body],
3024 )
3025 .unwrap();
3026
3027 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3028 assert!(results.is_empty(), "long memory must not appear in scan");
3029 }
3030
3031 #[test]
3032 fn scan_respects_limit() {
3033 let conn = open_test_db();
3034 for i in 0..5 {
3035 conn.execute(
3036 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3037 [],
3038 )
3039 .unwrap();
3040 }
3041
3042 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3043 assert_eq!(results.len(), 3, "limit must be respected");
3044 }
3045
3046 #[test]
3047 fn queue_db_schema_creates_correctly() {
3048 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3049 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3050 let count: i64 = conn
3051 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3052 .unwrap();
3053 assert_eq!(count, 0);
3054 let _ = std::fs::remove_file(&tmp_path);
3055 }
3056
3057 #[test]
3058 fn parse_claude_output_valid_bindings() {
3059 let output = r#"[
3060 {"type":"system","subtype":"init"},
3061 {"type":"result","is_error":false,"total_cost_usd":0.01,
3062 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3063 ]"#;
3064 let result = crate::commands::claude_runner::parse_claude_output(output)
3065 .expect("must parse successfully");
3066 assert!(result.value.get("entities").is_some());
3067 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3068 assert!(!result.is_oauth);
3069 }
3070
3071 #[test]
3072 fn parse_claude_output_detects_oauth() {
3073 let output = r#"[
3074 {"type":"system","subtype":"init","apiKeySource":"none"},
3075 {"type":"result","is_error":false,"total_cost_usd":0.0,
3076 "structured_output":{"entities":[],"relationships":[]}}
3077 ]"#;
3078 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3079 assert!(result.is_oauth);
3080 }
3081
3082 #[test]
3083 fn parse_claude_output_rate_limit_returns_error() {
3084 let output = r#"[
3085 {"type":"system","subtype":"init"},
3086 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3087 ]"#;
3088 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3089 assert!(matches!(err, AppError::RateLimited { .. }));
3090 }
3091
3092 #[test]
3093 fn parse_claude_output_auth_error() {
3094 let output = r#"[
3095 {"type":"system","subtype":"init"},
3096 {"type":"result","is_error":true,"error":"authentication failed"}
3097 ]"#;
3098 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3099 assert!(format!("{err}").contains("authentication failed"));
3100 }
3101
3102 #[test]
3103 fn dry_run_emits_preview_without_calling_llm() {
3104 let conn = open_test_db();
3109 conn.execute(
3110 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3111 [],
3112 )
3113 .unwrap();
3114
3115 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
3116 assert_eq!(results.len(), 1);
3117 assert_eq!(results[0].1, "dry-mem");
3118 }
3121
3122 #[test]
3123 fn persist_entity_description_updates_db() {
3124 let conn = open_test_db();
3125 conn.execute(
3126 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
3127 [],
3128 )
3129 .unwrap();
3130 let eid: i64 = conn
3131 .query_row(
3132 "SELECT id FROM entities WHERE name='tokio-runtime'",
3133 [],
3134 |r| r.get(0),
3135 )
3136 .unwrap();
3137
3138 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
3139
3140 let desc: String = conn
3141 .query_row(
3142 "SELECT description FROM entities WHERE id=?1",
3143 rusqlite::params![eid],
3144 |r| r.get(0),
3145 )
3146 .unwrap();
3147 assert_eq!(desc, "Async runtime for Rust applications");
3148 }
3149
3150 #[test]
3151 fn bindings_schema_is_valid_json() {
3152 let _: serde_json::Value =
3153 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
3154 }
3155
3156 #[test]
3157 fn entity_description_schema_is_valid_json() {
3158 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
3159 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
3160 }
3161
3162 #[test]
3163 fn body_enrich_schema_is_valid_json() {
3164 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
3165 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
3166 }
3167}